skywalking08 - 链路追踪tag查找配置(下)

本文涉及的产品
可观测链路 OpenTelemetry 版,每月50GB免费额度
简介: skywalking08 - 链路追踪tag查找配置(下)

skywalking08 - 链路追踪tag查找配置(下)

在上篇中,已经讲述了如何配置tag进行查找,以及一些坑点,比如H2数据库会导致无法查找。那这一篇主要讲讲代码实现,比较轻松,简单的crud。

查询入口方法

通过一个org.apache.skywalking.oap.query.graphql.resolver.TraceQuery#queryBasicTraces方法,来获取的查询服务进行查找

public TraceBrief queryBasicTraces(final TraceQueryCondition condition) throws IOException {
        long startSecondTB = 0;
        long endSecondTB = 0;
        String traceId = Const.EMPTY_STRING;
        if (!Strings.isNullOrEmpty(condition.getTraceId())) {
            // 有链路流水号
            traceId = condition.getTraceId();
        } else if (nonNull(condition.getQueryDuration())) {
            // 开始结束时间
            startSecondTB = condition.getQueryDuration().getStartTimeBucketInSec();
            endSecondTB = condition.getQueryDuration().getEndTimeBucketInSec();
        } else {
            throw new UnexpectedException("The condition must contains either queryDuration or traceId.");
        }
    // 各种条件的获取
        int minDuration = condition.getMinTraceDuration();
        int maxDuration = condition.getMaxTraceDuration();
        String endpointName = condition.getEndpointName();
        String endpointId = condition.getEndpointId();
        TraceState traceState = condition.getTraceState();
        QueryOrder queryOrder = condition.getQueryOrder();
        Pagination pagination = condition.getPaging();
    // 获取查询服务,进行查询Traces (一个方法这么多参数也是有点离谱)
        return getQueryService().queryBasicTraces(
            condition.getServiceId(), condition.getServiceInstanceId(), endpointId, traceId, endpointName, minDuration,
            maxDuration, traceState, queryOrder, pagination, startSecondTB, endSecondTB, condition.getTags()
        );
    }

获取查询Dao类进行查询

通过org.apache.skywalking.oap.server.core.query.TraceQueryService#queryBasicTraces来屏蔽底层数据源的不同

public TraceBrief queryBasicTraces(final String serviceId,
                                       final String serviceInstanceId,
                                       final String endpointId,
                                       final String traceId,
                                       final String endpointName,
                                       final int minTraceDuration,
                                       int maxTraceDuration,
                                       final TraceState traceState,
                                       final QueryOrder queryOrder,
                                       final Pagination paging,
                                       final long startTB,
                                       final long endTB,
                                       final List<Tag> tags) throws IOException {
        PaginationUtils.Page page = PaginationUtils.INSTANCE.exchange(paging);
    // 获取具体的数据源进行查询
        return getTraceQueryDAO().queryBasicTraces(
            startTB, endTB, minTraceDuration, maxTraceDuration, endpointName, serviceId, serviceInstanceId, endpointId,
            traceId, page.getLimit(), page.getFrom(), traceState, queryOrder, tags
        );
    }

可以看到具体的Dao的实现类的继承关系如下:

可以看到对应的数据库的选型对应有dao进行查询,像h2、mysql、influxdb、es等均有对应dao。我们选择es6的进行查看大致代码。

@Override
    public TraceBrief queryBasicTraces(long startSecondTB,
                                       long endSecondTB,
                                       long minDuration,
                                       long maxDuration,
                                       String endpointName,
                                       String serviceId,
                                       String serviceInstanceId,
                                       String endpointId,
                                       String traceId,
                                       int limit,
                                       int from,
                                       TraceState traceState,
                                       QueryOrder queryOrder,
                                       final List<Tag> tags) throws IOException {
        // 查询条件构建
        SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        sourceBuilder.query(boolQueryBuilder);
        List<QueryBuilder> mustQueryList = boolQueryBuilder.must();
        if (startSecondTB != 0 && endSecondTB != 0) {
            mustQueryList.add(QueryBuilders.rangeQuery(SegmentRecord.TIME_BUCKET).gte(startSecondTB).lte(endSecondTB));
        }
        if (minDuration != 0 || maxDuration != 0) {
            RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(SegmentRecord.LATENCY);
            if (minDuration != 0) {
                rangeQueryBuilder.gte(minDuration);
            }
            if (maxDuration != 0) {
                rangeQueryBuilder.lte(maxDuration);
            }
            boolQueryBuilder.must().add(rangeQueryBuilder);
        }
        if (!Strings.isNullOrEmpty(endpointName)) {
            String matchCName = MatchCNameBuilder.INSTANCE.build(SegmentRecord.ENDPOINT_NAME);
            mustQueryList.add(QueryBuilders.matchPhraseQuery(matchCName, endpointName));
        }
        if (StringUtil.isNotEmpty(serviceId)) {
            boolQueryBuilder.must().add(QueryBuilders.termQuery(SegmentRecord.SERVICE_ID, serviceId));
        }
        if (StringUtil.isNotEmpty(serviceInstanceId)) {
            boolQueryBuilder.must().add(QueryBuilders.termQuery(SegmentRecord.SERVICE_INSTANCE_ID, serviceInstanceId));
        }
        if (!Strings.isNullOrEmpty(endpointId)) {
            boolQueryBuilder.must().add(QueryBuilders.termQuery(SegmentRecord.ENDPOINT_ID, endpointId));
        }
        if (!Strings.isNullOrEmpty(traceId)) {
            boolQueryBuilder.must().add(QueryBuilders.termQuery(SegmentRecord.TRACE_ID, traceId));
        }
        switch (traceState) {
            case ERROR:
                mustQueryList.add(QueryBuilders.matchQuery(SegmentRecord.IS_ERROR, BooleanUtils.TRUE));
                break;
            case SUCCESS:
                mustQueryList.add(QueryBuilders.matchQuery(SegmentRecord.IS_ERROR, BooleanUtils.FALSE));
                break;
        }
        switch (queryOrder) {
            case BY_START_TIME:
                sourceBuilder.sort(SegmentRecord.START_TIME, SortOrder.DESC);
                break;
            case BY_DURATION:
                sourceBuilder.sort(SegmentRecord.LATENCY, SortOrder.DESC);
                break;
        }
        if (CollectionUtils.isNotEmpty(tags)) {
            BoolQueryBuilder tagMatchQuery = QueryBuilders.boolQuery();
            // 将TAGS添加到查询条件中
            tags.forEach(tag -> tagMatchQuery.must(QueryBuilders.termQuery(SegmentRecord.TAGS, tag.toString())));
            mustQueryList.add(tagMatchQuery);
        }
        sourceBuilder.size(limit);
        sourceBuilder.from(from);
        // 执行查询
        SearchResponse response = getClient().search(new TimeRangeIndexNameMaker(SegmentRecord.INDEX_NAME, startSecondTB, endSecondTB), sourceBuilder);
        TraceBrief traceBrief = new TraceBrief();
        traceBrief.setTotal((int) response.getHits().totalHits);
        for (SearchHit searchHit : response.getHits().getHits()) {
            BasicTrace basicTrace = new BasicTrace();
            basicTrace.setSegmentId((String) searchHit.getSourceAsMap().get(SegmentRecord.SEGMENT_ID));
            basicTrace.setStart(String.valueOf(searchHit.getSourceAsMap().get(SegmentRecord.START_TIME)));
            basicTrace.getEndpointNames().add((String) searchHit.getSourceAsMap().get(SegmentRecord.ENDPOINT_NAME));
            basicTrace.setDuration(((Number) searchHit.getSourceAsMap().get(SegmentRecord.LATENCY)).intValue());
            basicTrace.setError(
                BooleanUtils.valueToBoolean(
                    ((Number) searchHit.getSourceAsMap().get(SegmentRecord.IS_ERROR)).intValue()
                )
            );
            basicTrace.getTraceIds().add((String) searchHit.getSourceAsMap().get(SegmentRecord.TRACE_ID));
            traceBrief.getTraces().add(basicTrace);
        }
        return traceBrief;
    }

总结

总体来说,这个接口即是将查询条件组装到查询语句中,最后将查询结果组装为trace对象,是个CRUD的接口,技术含量并不高。但是它用来接入多数据源、同数据源不同的版本的设计模式值得参考学习。

相关实践学习
基于OpenTelemetry构建全链路追踪与监控
本实验将带领您快速上手可观测链路OpenTelemetry版,包括部署并接入多语言应用、体验TraceId自动注入至日志以实现调用链与日志的关联查询、以及切换调用链透传协议以满足全链路打通的需求。
分布式链路追踪Skywalking
Skywalking是一个基于分布式跟踪的应用程序性能监控系统,用于从服务和云原生等基础设施中收集、分析、聚合以及可视化数据,提供了一种简便的方式来清晰地观测分布式系统,具有分布式追踪、性能指标分析、应用和服务依赖分析等功能。 分布式追踪系统发展很快,种类繁多,给我们带来很大的方便。但在数据采集过程中,有时需要侵入用户代码,并且不同系统的 API 并不兼容,这就导致了如果希望切换追踪系统,往往会带来较大改动。OpenTracing为了解决不同的分布式追踪系统 API 不兼容的问题,诞生了 OpenTracing 规范。OpenTracing 是一个轻量级的标准化层,它位于应用程序/类库和追踪或日志分析程序之间。Skywalking基于OpenTracing规范开发,具有性能好,支持多语言探针,无侵入性等优势,可以帮助我们准确快速的定位到线上故障和性能瓶颈。 在本套课程中,我们将全面的讲解Skywalking相关的知识。从APM系统、分布式调用链等基础概念的学习加深对Skywalking的理解,从0开始搭建一套完整的Skywalking环境,学会对各类应用进行监控,学习Skywalking常用插件。Skywalking原理章节中,将会对Skywalking使用的agent探针技术进行深度剖析,除此之外还会对OpenTracing规范作整体上的介绍。通过对本套课程的学习,不止能学会如何使用Skywalking,还将对其底层原理和分布式架构有更深的理解。本课程由黑马程序员提供。
目录
相关文章
|
存储 SQL Java
skywalking08 - 链路追踪tag查找配置(上)
skywalking08 - 链路追踪tag查找配置(上)
483 0
|
监控 网络协议 Java
分布式链路追踪- SkyWalking使用手册
分布式链路追踪- SkyWalking使用手册
1110 0
分布式链路追踪- SkyWalking使用手册
|
3月前
|
消息中间件 SpringCloudAlibaba Java
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(八)Config服务配置+bus消息总线+stream消息驱动+Sleuth链路追踪
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(八)Config服务配置+bus消息总线+stream消息驱动+Sleuth链路追踪
909 0
|
9月前
|
存储 监控 数据可视化
Golang链路追踪:实现高效可靠的分布式系统监控
Golang链路追踪:实现高效可靠的分布式系统监控
|
9月前
|
消息中间件 监控 安全
RocketMQ x OpenTelemetry 分布式全链路追踪最佳实践(3)
RocketMQ x OpenTelemetry 分布式全链路追踪最佳实践
104 0
RocketMQ x OpenTelemetry 分布式全链路追踪最佳实践(3)
|
9月前
|
消息中间件 Java Kafka
RocketMQ x OpenTelemetry 分布式全链路追踪最佳实践(2)
RocketMQ x OpenTelemetry 分布式全链路追踪最佳实践(2)
103 0
RocketMQ x OpenTelemetry 分布式全链路追踪最佳实践(2)
|
9月前
|
消息中间件 Cloud Native Apache
RocketMQ x OpenTelemetry 分布式全链路追踪最佳实践(1)
RocketMQ x OpenTelemetry 分布式全链路追踪最佳实践
70 0
RocketMQ x OpenTelemetry 分布式全链路追踪最佳实践(1)
|
SpringCloudAlibaba 算法 Java
Spring Boot项目如何实现分布式日志链路追踪
作为一名后端开发工程师,排查系统问题用得最多的手段之一就是查看系统日志,在当下主要的分布式集群环境中一般使用ELK(Elasticsearch , Logstash, Kibana)来统一收集日志,以便后续查看日志定位追踪相关问题。但是在并发情况下,大量的系统用户即多线程并发访问后端服务导致同一个请求的日志记录不再是连续相邻的,此时多个请求的日志是一起串行输出到文件中,所以我们筛选出指定请求的全部相关日志还是比较麻烦的,同时当后端异步处理功能逻辑以及微服务的下游服务调用日志追踪也有着相同的问题。
|
消息中间件 数据可视化 JavaScript
什么是链路追踪?分布式系统如何实现链路追踪?
什么是链路追踪?分布式系统如何实现链路追踪?
|
存储 监控 NoSQL
【微服务】分布式如何利用Skywalking实现链路追踪与监控?
微服务下的分布式如何实现链路追踪和监控。
908 1
【微服务】分布式如何利用Skywalking实现链路追踪与监控?

热门文章

最新文章