来!做一个分钟级业务监控系统 【实战】

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 来!做一个分钟级业务监控系统 【实战】  如何做一个实时的业务统计的监控?比如分钟级?也就是每分钟可以快速看到业务的变化趋势,及可以做一些简单的分组查询?  哎,你可能说很简单了,直接从数据库 count 就可以了! 你是对的。

来!做一个分钟级业务监控系统 【实战】
  如何做一个实时的业务统计的监控?比如分钟级?也就是每分钟可以快速看到业务的变化趋势,及可以做一些简单的分组查询?

  哎,你可能说很简单了,直接从数据库 count 就可以了! 你是对的。

  但如果不允许你使用db进行count呢?因为线上数据库资源可是很宝贵的哦,你这一count可能会给db带来灾难了。

那不然咋整?

没有db,我们还有其他数据源嘛,比如: 消息队列?埋点数据? 本文将是基于该前提而行。

  

做监控,尽量不要侵入业务太多!所以有一个消息中间件是至关重要的。针对大数据系统,一般是: kafka 或者 类kafka. (如本文基础 loghub)

  有了消息中间件,如何进行分钟级监控? 这个应该就很简单了吧。不过如果要自己实现,其实坑也不少的!

如果自己实现计数,那么你可能需要做以下几件事:

  1. 每消费一个消息,你需要一个累加器;
  2. 每隔一个周期,你可能需要一个归档操作;
  3. 你可能需要考虑各种并发安全问题;
  4. 你可能需要考虑种性能问题;
  5. 你可能需要考虑各种机器故障问题;
  6. 你可能需要考虑各种边界值问题;

  哎,其实没那么难。时间序列数据库,就专门为这类事情而生!如OpenTSDB: http://opentsdb.net/overview.html

  可以说,TSDB 是这类应用场景的杀手锏。或者基于流计算框架: 如flink, 也是很轻松完成的事。但是不是本文的方向,略过!

本文是基于 loghub 的现有数据,进行分钟级统计后,入库 mysql 中,从而支持随时查询。(因loghub每次查询都是要钱的,所以,不可能直接查询)

  loghub 数据结构如: 2019-07-10 10:01:11,billNo,userId,productCode,...

  由于loghub提供了很多强大的查询统计功能,所以我们可以直接使用了。

  核心功能就是一个统计sql,还是比较简单的。但是需要考虑的点也不少,接下来,将为看官们奉上一个完整的解决方案!

撸代码去!

  1. 核心统计任务实现类 MinuteBizDataCounterTask
    复制代码

import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.LogContent;
import com.aliyun.openservices.log.common.LogItem;
import com.aliyun.openservices.log.common.QueriedLog;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.response.GetLogsResponse;
import com.my.service.statistics.StatisticsService;
import com.my.entity.BizDataStatisticsMin;
import com.my.model.LoghubQueryCounterOffsetModel;
import com.my.util.loghub.LogHubProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.List;

/**

  • 基于loghub 的分钟级 统计任务
    */

@Component
@Slf4j
public class MinuteBizDataCounterTask implements Runnable {

@Resource
private LogHubProperties logHubProperties;

@Resource
private StatisticsService statisticsService;

@Resource(name = "defaultOffsetQueryTaskCallback")
private DefaultOffsetQueryTaskCallbackImpl defaultOffsetQueryTaskCallback;

/**
 * loghub 客户端
 */
private volatile Client mClient;

/**
 * 过滤的topic
 */
private static final String LOGHUB_TOPIC = "topic_test";

/**
 * 单次扫描loghub最大时间 间隔分钟数
 */
@Value("${loghub.offset.counter.perScanMaxMinutesGap}")
private Integer perScanMaxMinutesGap;

/**
 * 单次循环最大数
 */
@Value("${loghub.offset.counter.perScanMaxRecordsLimit}")
private Integer perScanMaxRecordsLimit;

/**
 * 构造必要实例信息
 */
public ProposalPolicyBizDataCounterTask() {

}

@Override
public void run() {
    if(mClient == null) {
        this.mClient = new Client(logHubProperties.getEndpoint(),
                            logHubProperties.getAccessKeyId(), logHubProperties.getAccessKey());
    }
    while (!Thread.interrupted()) {
        try {
            updateLastMinutePolicyNoCounter();
            Thread.sleep(60000);
        }
        catch (InterruptedException e) {
            log.error("【分钟级统计task】, sleep 中断", e);
            Thread.currentThread().interrupt();
        }
        catch (Exception e) {
            // 注意此处可能有风险,发生异常后将快速死循环
            log.error("【分钟级统计task】更新异常", e);
            try {
                Thread.sleep(10000);
            }
            catch (InterruptedException ex) {
                log.error("【分钟级统计task】异常,且sleep异常", ex);
                Thread.currentThread().interrupt();
            }
        }
    }
}

/**
 * 更新最近的数据 (分钟级)
 *
 * @throws LogException loghub查询异常时抛出
 */
private void updateLastMinutePolicyNoCounter() throws LogException {
    updateMinutePolicyNoCounter(null);
}

/**
 * 更新最近的数据
 */
public Integer updateMinutePolicyNoCounter(LoghubQueryCounterOffsetModel specifyOffset) throws LogException {
    // 1. 获取偏移量
    // 2. 根据偏移量,判定是否可以一次性取完,或者多次获取更新
    // 3. 从loghub中设置偏移量,获取统计数据,更新
    // 4. 更新db数据统计值
    // 5. 更新偏移量
    // 6. 等待下一次更新

    // 指定offset时,可能为补数据
    final LoghubQueryCounterOffsetModel destOffset = enhanceQueryOffset(specifyOffset);
    initSharedQueryOffset(destOffset, destOffset == specifyOffset);

    Integer totalAffectNum = 0;

    while (!isScanFinishOnDestination(destOffset)) {
        // 完整扫描一次时间周期
        calcNextSharedQueryOffset(destOffset);
        while (true) {
            calcNextInnerQueryOffset();
            ArrayList<QueriedLog> logs = queryPerMinuteStatisticFromLoghubOnCurrentOffset();
            Integer affectNum = handleMiniOffsetBatchCounter(logs);
            totalAffectNum += affectNum;
            log.info("【分钟级统计task】本次更新数据:{}, offset:{}", affectNum, getCurrentSharedQueryOffset());
            if(!hasMoreDataOffset(logs.size())) {
                rolloverOffsetAndCommit();
                break;
            }
        }
    }
    log.info("【分钟级统计task】本次更新数据,总共:{}, destOffset:{}, curOffset:{}",
                        totalAffectNum, destOffset, getCurrentSharedQueryOffset());
    rolloverOffsetAndCommit();
    return totalAffectNum;
}

/**
 * 处理一小批的统计数据
 *
 * @param logs 小批统计loghub数据
 * @return 影响行数
 */
private Integer handleMiniOffsetBatchCounter(ArrayList<QueriedLog> logs) {
    if (logs == null || logs.isEmpty()) {
        return 0;
    }
    List<BizDataStatisticsMin> statisticsMinList = new ArrayList<>();
    for (QueriedLog log1 : logs) {
        LogItem getLogItem = log1.GetLogItem();
        BizDataStatisticsMin statisticsMin1 = adaptStatisticsMinDbData(getLogItem);
        statisticsMin1.setEventCode(PROPOSAL_FOUR_IN_ONE_TOPIC);
        statisticsMin1.setEtlVersion(getCurrentScanTimeDuring() + ":" + statisticsMin1.getStatisticsCount());
        statisticsMinList.add(statisticsMin1);
    }
    return statisticsService.batchUpsertPremiumStatistics(statisticsMinList, getCurrentOffsetCallback());
}
/**
 * 获取共享偏移信息
 *
 * @return 偏移
 */
private LoghubQueryCounterOffsetModel getCurrentSharedQueryOffset() {
    return defaultOffsetQueryTaskCallback.getCurrentOffset();
}

/**
 * 判断本次是否扫描完成
 *
 * @param destOffset 目标偏移
 * @return true:扫描完成, false: 未完成
 */
private boolean isScanFinishOnDestination(LoghubQueryCounterOffsetModel destOffset) {
    return defaultOffsetQueryTaskCallback.getEndTime() >= destOffset.getEndTime();
}

/**
 * 获取偏移提交回调器
 *
 * @return 回调实例
 */
private OffsetQueryTaskCallback getCurrentOffsetCallback() {
    return defaultOffsetQueryTaskCallback;
}

/**
 * 初始化共享的查询偏移变量
 *
 * @param destOffset 目标偏移
 * @param isSpecifyOffset 是否是手动指定的偏移
 */
private void initSharedQueryOffset(LoghubQueryCounterOffsetModel destOffset, boolean isSpecifyOffset) {
    // 整分花时间数据
    Integer queryStartTime = destOffset.getStartTime();
    if(queryStartTime % 60 != 0) {
        queryStartTime = queryStartTime / 60 * 60;
    }
    // 将目标扫描时间终点 设置为起点,以备后续迭代
    defaultOffsetQueryTaskCallback.initCurrentOffset(queryStartTime, queryStartTime,
                                                    destOffset.getOffsetStart(), destOffset.getLimit(),
                                                    destOffset.getIsNewStep(), isSpecifyOffset);
    if(defaultOffsetQueryTaskCallback.getIsNewStep()) {
        resetOffsetDefaultSettings();
    }
}

/**
 * 计算下一次统计偏移时间
 *
 * @param destOffset 目标偏移值
 */
private void calcNextSharedQueryOffset(LoghubQueryCounterOffsetModel destOffset) {
    int perScanMaxSecondsGap = perScanMaxMinutesGap * 60;
    if(destOffset.getEndTime() - defaultOffsetQueryTaskCallback.getStartTime() > perScanMaxSecondsGap) {
        defaultOffsetQueryTaskCallback.setStartTime(defaultOffsetQueryTaskCallback.getEndTime());
        int nextExpectEndTime = defaultOffsetQueryTaskCallback.getStartTime() + perScanMaxSecondsGap;
        if(nextExpectEndTime > destOffset.getEndTime()) {
            nextExpectEndTime = destOffset.getEndTime();
        }
        defaultOffsetQueryTaskCallback.setEndTime(nextExpectEndTime);
    }
    else {
        defaultOffsetQueryTaskCallback.setStartTime(defaultOffsetQueryTaskCallback.getEndTime());
        defaultOffsetQueryTaskCallback.setEndTime(destOffset.getEndTime());
    }
    resetOffsetDefaultSettings();
}

/**
 * 重置偏移默认配置
 */
private void resetOffsetDefaultSettings() {
    defaultOffsetQueryTaskCallback.setIsNewStep(true);
    defaultOffsetQueryTaskCallback.setOffsetStart(0);
    defaultOffsetQueryTaskCallback.setLimit(0);
}

/**
 * 计算下一次小偏移,此种情况应对 一次外部偏移未查询完成的情况
 */
private void calcNextInnerQueryOffset() {
    defaultOffsetQueryTaskCallback.setIsNewStep(false);
    // 第一次计算时,limit 为0, 所以得出的 offsetStart 也是0
    defaultOffsetQueryTaskCallback.setOffsetStart(
            defaultOffsetQueryTaskCallback.getOffsetStart() + defaultOffsetQueryTaskCallback.getLimit());
    defaultOffsetQueryTaskCallback.setLimit(perScanMaxRecordsLimit);
}

/**
 * 获取当前循环的扫描区间
 *
 * @return 15567563433-1635345099 区间
 */
private String getCurrentScanTimeDuring() {
    return defaultOffsetQueryTaskCallback.getStartTime() + "-" + defaultOffsetQueryTaskCallback.getEndTime();
}

/**
 * 从loghub查询每分钟的统计信息
 *
 * @return 查询到的统计信息
 * @throws LogException loghub 异常时抛出
 */
private ArrayList<QueriedLog> queryPerMinuteStatisticFromLoghubOnCurrentOffset() throws LogException {
    // 先按保单号去重,再进行计数统计
    String countSql = "* | split(bizData, ',')[5] policyNo, bizData GROUP by split(bizData, ',')[5] " +
            " | select count(1) as totalCountMin, " +
            "split(bizData, ',')[2] as productCode," +
            "split(bizData, ',')[3] as schemaCode," +
            "split(bizData, ',')[4] as channelCode," +
            "substr(split(bizData, ',')[1], 1, 16) as myDateTimeMinute " +
            "group by substr(split(bizData, ',')[1], 1, 16), split(bizData, ',')[2],split(bizData, ',')[3], split(bizData, ',')[4],split(bizData, ',')[7], split(bizData, ',')[8]";
    countSql += " limit " + defaultOffsetQueryTaskCallback.getOffsetStart() + "," + defaultOffsetQueryTaskCallback.getLimit();
    GetLogsResponse countResponse = mClient.GetLogs(logHubProperties.getProjectName(), logHubProperties.getBizCoreDataLogStore(),
            defaultOffsetQueryTaskCallback.getStartTime(), defaultOffsetQueryTaskCallback.getEndTime(),
            LOGHUB_TOPIC, countSql);
    if(!countResponse.IsCompleted()) {
        log.error("【分钟级统计task】扫描获取到未完整的数据,请速检查原因,offSet:{}", getCurrentSharedQueryOffset());
    }
    return countResponse.GetLogs() == null
                ? new ArrayList<>()
                : countResponse.GetLogs();
}

/**
 * 根据上一次返回的记录数量,判断是否还有更多数据
 *
 * @param lastGotRecordsCount 上次返回的记录数 (数据量大于最大数说明还有未取完数据)
 * @return true: 是还有更多数据应该再循环获取, false: 无更多数据结束本期任务
 */
private boolean hasMoreDataOffset(int lastGotRecordsCount) {
    return lastGotRecordsCount >= perScanMaxRecordsLimit;
}

/**
 * 加强版的 offset 优先级: 指定偏移 -> 基于缓存的偏移 -> 新生成偏移标识
 *
 * @param specifyOffset 指定偏移(如有)
 * @return 偏移标识
 */
private LoghubQueryCounterOffsetModel enhanceQueryOffset(LoghubQueryCounterOffsetModel specifyOffset) {
    if(specifyOffset != null) {
        return specifyOffset;
    }
    LoghubQueryCounterOffsetModel offsetBaseOnCache = getNextOffsetBaseOnCache();
    if(offsetBaseOnCache != null) {
        return offsetBaseOnCache;
    }
    return generateNewOffset();
}

/**
 * 基于缓存获取一下偏移标识
 *
 * @return 偏移
 */
private LoghubQueryCounterOffsetModel getNextOffsetBaseOnCache() {
    LoghubQueryCounterOffsetModel offsetFromCache = defaultOffsetQueryTaskCallback.getCurrentOffsetFromCache();
    if(offsetFromCache == null) {
        return null;
    }
    LocalDateTime now = LocalDateTime.now();
    LocalDateTime nowMinTime = LocalDateTime.of(now.getYear(), now.getMonth(), now.getDayOfMonth(),
                                                now.getHour(), now.getMinute());
    // 如果上次仍未内部循环完成,则使用原来的
    if(offsetFromCache.getIsNewStep()) {
        offsetFromCache.setStartTime(offsetFromCache.getEndTime());
        long endTime = nowMinTime.toEpochSecond(ZoneOffset.of("+8"));
        offsetFromCache.setEndTime((int) endTime);
    }
    return offsetFromCache;
}

/**
 * 生成新的完整的 偏移标识
 *
 * @return 新偏移
 */
private LoghubQueryCounterOffsetModel generateNewOffset() {
    LoghubQueryCounterOffsetModel offsetNew = new LoghubQueryCounterOffsetModel();
    LocalDateTime now = LocalDateTime.now();
    LocalDateTime nowMinTime = LocalDateTime.of(now.getYear(), now.getMonth(), now.getDayOfMonth(),
            now.getHour(), now.getMinute());
    long startTime = nowMinTime.minusDays(1).toEpochSecond(ZoneOffset.of("+8"));
    long endTime = nowMinTime.toEpochSecond(ZoneOffset.of("+8"));
    offsetNew.setStartTime((int) startTime);
    offsetNew.setEndTime((int) endTime);
    return offsetNew;
}
/**
 * 将日志返回数据 适配到数据库记录中
 *
 * @param logItem 日志详情
 * @return db数据结构对应
 */
private BizDataStatisticsMin adaptStatisticsMinDbData(LogItem logItem) {
    ArrayList<LogContent> logContents = logItem.GetLogContents();
    BizDataStatisticsMin statisticsMin1 = new BizDataStatisticsMin();
    for (LogContent logContent : logContents) {
        switch (logContent.GetKey()) {
            case "totalCountMin":
                statisticsMin1.setStatisticsCount(Integer.valueOf(logContent.GetValue()));
                break;
            case "productCode":
                statisticsMin1.setProductCode(logContent.GetValue());
                break;
            case "myDateTimeMinute":
                String signDtMinStr = logContent.GetValue();
                String[] dateTimeArr = signDtMinStr.split(" ");
                String countDate = dateTimeArr[0];
                String[] timeArr = dateTimeArr[1].split(":");
                String countHour = timeArr[0];
                String countMin = timeArr[1];
                statisticsMin1.setCountDate(countDate);
                statisticsMin1.setCountHour(countHour);
                statisticsMin1.setCountMin(countMin);
                break;
            default:
                break;
        }
    }
    return statisticsMin1;
}

/**
 * 重置默认值,同时提交当前 (滚动到下一个偏移点)
 */
private void rolloverOffsetAndCommit() {
    resetOffsetDefaultSettings();
    commitOffsetSync();
}

/**
 * 提交偏移量
 *
 */
private void commitOffsetSync() {
    defaultOffsetQueryTaskCallback.commit();
}

}
复制代码
  主要实现逻辑如下:

    1. 每隔一分钟进行一个查询;
    2. 发生异常后,容错继续查询;
    3. 对于一个新统计,默认倒推一天范围进行统计;
    4. 统计时间范围间隔可设置,避免一次查询数量太大,费用太高且查询返回数量有限;
    5. 对于每次小批量查询,支持分布操作,直到取完数据;
    6. 小批量数据完成后,自动提交查询偏移;
    7. 后续查询将基础提交的偏移进行;
    8. 支持断点查询;

  1. 偏移提交管理器 OffsetQueryTaskCallback
      主任务中,只管进行数据统计查询,提交偏移操作由其他类进行;

复制代码
/**

  • 普通任务回调接口定义, 考虑到多种类型的统计任务偏移操作方式可能不一,定义一个通用型偏移接口
    *

*/
public interface OffsetQueryTaskCallback {

/**
 * 回调方法入口, 提交偏移
 */
public void commit();

/**
 * 设置初始化绑定当前偏移(期间不得改变)
 *
 * @param startTime 偏移开始时间
 * @param endTime 偏移结束时间
 * @param offsetStart 偏移开始值(分页)
 * @param limit 单次取值最大数(分页)
 * @param isNewStep 是否是新的查询
 * @param isSpecifyOffset 是否是指定的偏移
 */
public void initCurrentOffset(Integer startTime, Integer endTime,
                              Integer offsetStart, Integer limit,
                              Boolean isNewStep, Boolean isSpecifyOffset);

/**
 * 从当前环境中获取当前偏移信息
 *
 * @return 偏移变量实例
 */
public LoghubQueryCounterOffsetModel getCurrentOffset();

}

import com.alibaba.fastjson.JSONObject;
import com.my.util.constants.RedisKeysConstantEnum;
import com.my.util.redis.RedisPoolUtil;
import com.my.model.LoghubQueryCounterOffsetModel;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**

  • 默认偏移回调实现
    *

*/
@Component("defaultOffsetQueryTaskCallback")
@Slf4j
public class DefaultOffsetQueryTaskCallbackImpl implements OffsetQueryTaskCallback {

@Resource
private RedisPoolUtil redisPoolUtil;

/**
 * 当前偏移信息
 */
private ThreadLocal<LoghubQueryCounterOffsetModel> currentOffsetHolder = new ThreadLocal<>();
@Override
public void commit() {
    if(!currentOffsetHolder.get().getIsSpecifyOffset()) {
        redisPoolUtil.set(RedisKeysConstantEnum.STATISTICS_COUNTER_OFFSET_CACHE_KEY.getRedisKey(),
                JSONObject.toJSONString(currentOffsetHolder.get()));
    }
}

@Override
public void initCurrentOffset(Integer startTime, Integer endTime,
                              Integer offsetStart, Integer limit,
                              Boolean isNewStep, Boolean isSpecifyOffset) {
    LoghubQueryCounterOffsetModel currentOffset = new LoghubQueryCounterOffsetModel();
    currentOffset.setStartTime(startTime);
    currentOffset.setEndTime(endTime);
    currentOffset.setOffsetStart(offsetStart);
    currentOffset.setIsNewStep(isNewStep);
    currentOffset.setIsSpecifyOffset(isSpecifyOffset);
    currentOffsetHolder.set(currentOffset);
}

@Override
public LoghubQueryCounterOffsetModel getCurrentOffset() {
    return currentOffsetHolder.get();
}

/**
 * 从缓存中获取当前偏移信息
 *
 * @return 缓存偏移或者 null
 */
public LoghubQueryCounterOffsetModel getCurrentOffsetFromCache() {
    String offsetCacheValue = redisPoolUtil.get(RedisKeysConstantEnum.STATISTICS_COUNTER_OFFSET_CACHE_KEY.getRedisKey());
    if (StringUtils.isBlank(offsetCacheValue)) {
        return null;
    }
    return JSONObject.parseObject(offsetCacheValue, LoghubQueryCounterOffsetModel.class);
}

public Integer getStartTime() {
    return currentOffsetHolder.get().getStartTime();
}

public void setStartTime(Integer startTime) {
    currentOffsetHolder.get().setStartTime(startTime);
}

public Integer getEndTime() {
    return currentOffsetHolder.get().getEndTime();
}

public void setEndTime(Integer endTime) {
    currentOffsetHolder.get().setEndTime(endTime);
}

public Integer getOffsetStart() {
    return currentOffsetHolder.get().getOffsetStart();
}

public void setOffsetStart(Integer offsetStart) {
    currentOffsetHolder.get().setOffsetStart(offsetStart);
}

public Integer getLimit() {
    return currentOffsetHolder.get().getLimit();
}

public void setLimit(Integer limit) {
    currentOffsetHolder.get().setLimit(limit);
}

public Boolean getIsNewStep() {
    return currentOffsetHolder.get().getIsNewStep();
}

public void setIsNewStep(Boolean isNewStep) {
    currentOffsetHolder.get().setIsNewStep(isNewStep);
}

}

/**

  • loghub 查询偏移量 数据容器
    *

*/
@Data
public class LoghubQueryCounterOffsetModel implements Serializable {

private static final long serialVersionUID = -3749552331349228045L;

/**
 * 开始时间
 */
private Integer startTime;

/**
 * 结束时间
 */
private Integer endTime;

/**
 * 起始偏移
 */
private Integer offsetStart = 0;

/**
 * 每次查询的 条数限制, 都需要进行设置后才可用, 否则查无数据
 */
private Integer limit = 0;

/**
 * 是否新的偏移循环,如未完成,应继续子循环 limit
 *
 * true: 是, offsetStart,limit 失效, false: 否, 需借助 offsetStart,limit 进行limit相加
 */
private Boolean isNewStep = true;

/**
 * 是否是手动指定的偏移,如果是说明是在手动被数据,偏移量将不会被更新
 *
 *      此变量是瞬时值,将不会被持久化到偏移标识中
 */
private transient Boolean isSpecifyOffset;

}
复制代码

  1. 批量更新统计结果数据库的实现
      因每次统计的数据量是不确定的,因尽可能早的提交一次统计结果,防止一次提交太多,或者 机器故障时所有统计白费,所以需要分小事务进行。

复制代码

@Service
public class StatisticsServiceImpl implements StatisticsService {

/**
 * 批量更新统计分钟级数据 (事务型提交)
 *
 * @param statisticsMinList 新统计数据
 * @return 影响行数
 */
@Transactional(isolation = Isolation.READ_COMMITTED, rollbackFor = Throwable.class)
public Integer batchUpsertPremiumStatistics(List<BizProposalPolicyStatisticsMin> statisticsMinList,
        OffsetQueryTaskCallback callback) {
    AtomicInteger updateCount = new AtomicInteger(0);
    statisticsMinList.forEach(item -> {
        int affectNum = 0;
        BizProposalPolicyStatisticsMin oldStatistics = bizProposalPolicyStasticsMinMapper.selectOneByCond(item);
        if (oldStatistics == null) {
            item.setEtlVersion(item.getEtlVersion() + ":0");
            affectNum = bizProposalPolicyStasticsMinMapper.insert(item);
        } else {
            oldStatistics.setStatisticsCount(oldStatistics.getStatisticsCount() + item.getStatisticsCount());
            String versionFull = versionKeeperFilter(oldStatistics.getEtlVersion(), item.getEtlVersion());
            oldStatistics.setEtlVersion(versionFull + ":" + oldStatistics.getStatisticsCount());
            // todo: 优化更新版本号问题
            affectNum = bizProposalPolicyStasticsMinMapper.updateByPrimaryKey(oldStatistics);
        }
        updateCount.addAndGet(affectNum);
    });
    callback.commit();
    return updateCount.get();
}

/**
 * 版本号过滤器(组装版本信息)
 *
 * @param oldVersion     老版本信息
 * @param currentVersion 当前版本号
 * @return 可用的版本信息
 */
private String versionKeeperFilter(String oldVersion, String currentVersion) {
    String versionFull = oldVersion + "," + currentVersion;
    if (versionFull.length() >= 500) {
        // 从150以后,第一版本号开始保留
        versionFull = versionFull.substring(versionFull.indexOf(',', 150));
    }
    return versionFull;
}

}
复制代码

  1. 你需要一个启动任务的地方
    复制代码

/**

  • 启动时运行的任务调度服务
    *

*/
@Service
@Slf4j
public class TaskAutoRunScheduleService {

@Resource
private MinuteBizDataCounterTask minuteBizDataCounterTask;

@PostConstruct
public void bizDataAutoRun() {
    log.info("============= bizDataAutoRun start =================");
    ExecutorService executorService = Executors.newSingleThreadExecutor(new NamedThreadFactory("Biz-data-counter-%d"));
    executorService.submit(minuteBizDataCounterTask);
}

}
复制代码

  1. 将每分钟的数据从db查询出来展示到页面
      以上将数据统计后以分钟级汇总到数据,接下来,监控页面就只需从db中进行简单聚合就可以了,咱们就不费那精力去展示了。
  1. 待完善的地方
      1. 集群环境的任务运行将会出问题,解决办法是:加一个分布式锁即可。 你可以的!

  2. 针对重试执行统计问题,还得再考虑考虑。(幂等性)
原文地址https://www.cnblogs.com/yougewe/p/11220586.html

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
6天前
|
运维 监控 DataWorks
DataWorks 稳定性保障全解析:深入监控与资源调配
DataWorks 的稳定性保障体系涵盖精细监控与资源调配,确保企业数据业务高效、稳定运行。监控模块包括资源、任务和质量监控,及时预警并处理异常;资源调配策略则针对集成、调度、数据服务及计算资源进行科学配置,保障数据同步、任务优先级和高并发需求。通过全方位的监控和合理的资源配置,DataWorks 为企业筑牢数据根基,助力数字化转型。
37 10
|
2月前
|
监控 数据可视化 安全
系统业务监控软件
【10月更文挑战第17天】
52 6
|
5月前
|
存储 监控 开发者
分布式链路监控系统问题之实现应用级透明的问题如何解决
分布式链路监控系统问题之实现应用级透明的问题如何解决
|
6月前
|
数据采集 运维 监控
软件研发核心问题之用户行为采集容易出的问题如何解决
软件研发核心问题之用户行为采集容易出的问题如何解决
|
8月前
|
消息中间件 Cloud Native Java
项目环境稳定性指标建设之路
这篇文章讨论了项目环境在集团研发中的重要性,它是一个灵活的平台工具,用于支持联调测试和不同阶段的环境隔离。早期的项目环境管理存在任务重复运行、单机处理瓶颈和任务猝死等问题。为了解决这些问题,文章介绍了通过引入领域驱动设计(DDD)来重构流程引擎,创建了统一的异常处理和任务执行接口,增强了异常处理能力,并通过分布式分片任务、工厂模式和责任链模式实现了任务的分布式运行。此外,还使用分布式锁解决了多机忙等和任务重复执行的问题,提高了任务执行效率。优化后,环境创建成功率提升至99%以上,创建时间降低至100秒以下,系统异常率低于1%,并且能够应对更高的并发量。
|
8月前
|
弹性计算 运维 安全
提升云上资源稳定性的两大利器,事件驱动体系构建&自诊断工具
阿里云弹性计算团队十三位产品专家和技术专家共同分享云上运维深度实践,详细阐述如何利用CloudOps工具实现运维提效、弹性降本。
139 1
|
8月前
|
弹性计算 运维 监控
提升云上资源稳定性的两大利器:事件驱动体系构建&自诊断工具
阿里云弹性计算技术专家王子龙和阿里云弹性计算技术专家樊超在本次课程中带来了题为《提升云上资源稳定性的两大利器:事件驱动体系构建&自诊断工具》的主题分享, 课程涵盖基于事件构建可观测体系、基于事件的云上运维、ECS事件驱动最佳实践、使用ECS遇到故障时的痛点分析、一眼排障ECS健康状态、一键定位ECS健康诊断等内容。
|
运维 监控 搜索推荐
阿里云林小平:如何实现资源高效运维及成本分析
通过标签功能进行资源运维及精细化的权限管理,实现高效能、低成本的目标。
阿里云林小平:如何实现资源高效运维及成本分析
|
SQL 存储 数据采集
5分钟完成业务实时监控系统搭建,是一种什么样的体验?
道旅需要构建一个全面的指标监控系统,既包括系统的业务指标:如各类业务类型的请求数变化,不同供应商信息的变化,客户请求的明细大盘,各酒店请求量的排名变化,不同城市的订单转换率分析报表等;也包括系统的运行指标:如服务器请求响应时间, 带宽使用情况等。评估了市场上的监控产品之后,道旅选择了阿里云应用实时监控服务 ARMS。
3126 9
5分钟完成业务实时监控系统搭建,是一种什么样的体验?
|
XML Prometheus 监控
基于flowcharting实现定制化业务链路动态监控
flowcharting是grafana社区提供的一款插件,其借助开源绘图工具drawio可以实现定制化的的业务链路动态监控,将各项监控指标以更加面向业务的图表形式进行展示,可以实现网络拓扑图、流程图、架构图等等各种图形,将比较分散的指标统一成可视化的监控图形。
2796 0
基于flowcharting实现定制化业务链路动态监控

热门文章

最新文章