Learn Influxdb the hard way (3) - Services in Influxdb I

本文涉及的产品
容器镜像服务 ACR,镜像仓库100个 不限时长
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介:

前言

在上一篇文章中我们梳理了Influxdb的代码主流程结构,从本篇开始将会对Influxdb中的各种Service进行分析,由于篇幅的限制,在这篇文章中会先介绍下MonitorService。在上文中,我们知道Influxdb是通过如下的方式注册Service。

    //cmd/influxd/run/server.go 371
    s.appendMonitorService()
    s.appendPrecreatorService(s.config.Precreator)
    s.appendSnapshotterService()
    s.appendContinuousQueryService(s.config.ContinuousQuery)
    s.appendHTTPDService(s.config.HTTPD)
    s.appendStorageService(s.config.Storage)
    s.appendRetentionPolicyService(s.config.Retention)

并通过如下的代码进行依次的调用的

    //cmd/influxd/run/server.go 434
    for _, service := range s.Services {
        if err := service.Open(); err != nil {
            return fmt.Errorf("open service: %s", err)
        }
    }

那么我们就可以从此处入手,剥离每个Service。

MonitorService解析

我们先来看下MonitorService的核心代码,可以发现MonitorService的初始化更多是在确保MonitorService是运行态的单例,并且注册了一些诊断的实例,最后判断配置是否支持存储,如果支持存储,会开启goroutine定期异步将数据存储在TSM Engine中。

//influxdb/monitor/service.go 73行
// New returns a new instance of the monitor system.
func New(r Reporter, c Config) *Monitor {
    return &Monitor{
        globalTags:           make(map[string]string),
        diagRegistrations:    make(map[string]diagnostics.Client),
        reporter:             r,
        storeEnabled:         c.StoreEnabled,
        storeDatabase:        c.StoreDatabase,
        storeInterval:        time.Duration(c.StoreInterval),
        storeRetentionPolicy: MonitorRetentionPolicy,
        Logger:               zap.NewNop(),
    }
}

// open returns whether the monitor service is open.
func (m *Monitor) open() bool {
    m.mu.Lock()
    defer m.mu.Unlock()
    return m.done != nil
}

// Open opens the monitoring system, using the given clusterID, node ID, and hostname
// for identification purpose.
func (m *Monitor) Open() error {
    if m.open() {
        m.Logger.Info("Monitor is already open")
        return nil
    }

    m.Logger.Info("Starting monitor service")

    // Self-register various stats and diagnostics.
    m.RegisterDiagnosticsClient("build", &build{
        Version: m.Version,
        Commit:  m.Commit,
        Branch:  m.Branch,
        Time:    m.BuildTime,
    })
    m.RegisterDiagnosticsClient("runtime", &goRuntime{})
    m.RegisterDiagnosticsClient("network", &network{})
    m.RegisterDiagnosticsClient("system", &system{})

    m.mu.Lock()
    m.done = make(chan struct{})
    m.mu.Unlock()

    // If enabled, record stats in a InfluxDB system.
    if m.storeEnabled {
        hostname, _ := os.Hostname()
        m.SetGlobalTag("hostname", hostname)

        // Start periodic writes to system.
        m.wg.Add(1)
        go m.storeStatistics()
    }

    return nil
}

下面重点看下storeStatistics中做的事情,storeStatistics中主要是通过Statistics方法获取stats,然后再通过writePoints方法写入到相应的store中。

//influxdb/monitor/service.go 418行
func (m *Monitor) storeStatistics() {
    defer m.wg.Done()
    m.Logger.Info("Storing statistics", logger.Database(m.storeDatabase), logger.RetentionPolicy(m.storeRetentionPolicy), logger.DurationLiteral("interval", m.storeInterval))

    // Wait until an even interval to start recording monitor statistics.
    // If we are interrupted before the interval for some reason, exit early.
    if err := m.waitUntilInterval(m.storeInterval); err != nil {
        return
    }

    tick := time.NewTicker(m.storeInterval)
    defer tick.Stop()

    for {
        select {
        case now := <-tick.C:
            now = now.Truncate(m.storeInterval)
            func() {
                m.mu.Lock()
                defer m.mu.Unlock()
                m.createInternalStorage()
            }()

            stats, err := m.Statistics(m.globalTags)
            if err != nil {
                m.Logger.Info("Failed to retrieve registered statistics", zap.Error(err))
                return
            }

            // Write all stats in batches
            batch := make(models.Points, 0, 5000)
            for _, s := range stats {
                pt, err := models.NewPoint(s.Name, models.NewTags(s.Tags), s.Values, now)
                if err != nil {
                    m.Logger.Info("Dropping point", zap.String("name", s.Name), zap.Error(err))
                    return
                }
                batch = append(batch, pt)
                if len(batch) == cap(batch) {
                    m.writePoints(batch)
                    batch = batch[:0]

                }
            }

            // Write the last batch
            if len(batch) > 0 {
                m.writePoints(batch)
            }
        case <-m.done:
            m.Logger.Info("Terminating storage of statistics")
            return
        }
    }
}

比较有趣的是Statistics方法不仅返回了[]*Statistic类型的监控指标,还通过expvar将数据通过API进行暴露,Go 标准库有一个 expvar 包。 这个包可以通过 JSON 格式的 HTTP API 公开您的应用程序和 Go 运行时的指标,有点类似prometheus的API 抓取的数据格式。而MonitorService的数据来源是通过runtime包获取。重要包含内存、goroutines等相关系统指标。

    //influxdb/monitor/service.go 324行
    statistic.Values = map[string]interface{}{
        "Alloc":        int64(rt.Alloc),
        "TotalAlloc":   int64(rt.TotalAlloc),
        "Sys":          int64(rt.Sys),
        "Lookups":      int64(rt.Lookups),
        "Mallocs":      int64(rt.Mallocs),
        "Frees":        int64(rt.Frees),
        "HeapAlloc":    int64(rt.HeapAlloc),
        "HeapSys":      int64(rt.HeapSys),
        "HeapIdle":     int64(rt.HeapIdle),
        "HeapInUse":    int64(rt.HeapInuse),
        "HeapReleased": int64(rt.HeapReleased),
        "HeapObjects":  int64(rt.HeapObjects),
        "PauseTotalNs": int64(rt.PauseTotalNs),
        "NumGC":        int64(rt.NumGC),
        "NumGoroutine": int64(runtime.NumGoroutine()),
    }

我们可以看下在TSM Engine中的存储的内容
1.jpg

细心的同学可能已经发现,在_internal库中还包含了很多其他的监控数据,那么这些数据是哪里来的呢,其实在上文中,我们忽略了一个重要的对象Reportor,在实例化Monitor的时候传递一个实现了Reporter接口实例r,Reporter接口定义了一个Statistics的方法,我们可以发现QueryExecutor,TSDBStore,PointsWriter,Subscriber等核心组件都实现了Statistics方法。

// cmd/influxd/run/server.go 210行
// Statistics returns statistics for the services running in the Server.
func (s *Server) Statistics(tags map[string]string) []models.Statistic {
    var statistics []models.Statistic
    statistics = append(statistics, s.QueryExecutor.Statistics(tags)...)
    statistics = append(statistics, s.TSDBStore.Statistics(tags)...)
    statistics = append(statistics, s.PointsWriter.Statistics(tags)...)
    statistics = append(statistics, s.Subscriber.Statistics(tags)...)
    for _, srv := range s.Services {
        if m, ok := srv.(monitor.Reporter); ok {
            statistics = append(statistics, m.Statistics(tags)...)
        }
    }
    return statistics
}

在gatherStatistics方法中会调用reporter,并将相应的数据与runtime等diagnoseClient的数据进行合并,并写入到Influxdb中。

//influxdb/monitor/service.go 347行
func (m *Monitor) gatherStatistics(statistics []*Statistic, tags map[string]string) []*Statistic {
   m.mu.RLock()
   defer m.mu.RUnlock()

   if m.reporter != nil {
      for _, s := range m.reporter.Statistics(tags) {
         statistics = append(statistics, &Statistic{Statistic: s})
      }
   }
   return statistics
}

总结

到此Influxdb的监控服务部分就比较清晰了,首先监控分为系统级别的监控和组件级别的监控,系统级别的监控会通过runtime的方式获取,组件级别的监控会通过各自组件实现Reportor接口进行实现,Influxdb会定期的存储监控数据到TSM Engine的_internal库中。同样Influxdb也会通过expvar将监控数据暴露到API上。

目录
相关文章
Query Performance Optimization at Alibaba Cloud Log Analytics Service
PrestoCon Day 2023,链接:https://prestoconday2023.sched.com/event/1Mjdc?iframe=no首页自我介绍,分享题目概要各个性能优化项能够优化的资源类别limit快速短路有什么优点?有啥特征?进一步的优化空间?避免不必要块的生成逻辑单元分布式执行,global 阶段的算子哪些字段无需输出?公共子表达式结合FilterNode和Proje
Query Performance Optimization at Alibaba Cloud Log Analytics Service
|
机器学习/深度学习 Ubuntu iOS开发
【Elastic Engineering】Beats:解密 Filebeat 中的 setup 命令
这个步骤非常重要,但是描述的内容并不是很多。为什么需要这个步骤呢?它到底能够做什么呢?
574 0
【Elastic Engineering】Beats:解密 Filebeat 中的 setup 命令
|
存储 API 索引
【Elastic Engineering】Elasticsearch:Cluster 备份 Snapshot 及 Restore API
Elasticsearch:Cluster 备份 Snapshot 及 Restore API
265 0
【Elastic Engineering】Elasticsearch:Cluster 备份 Snapshot 及 Restore API
|
存储 测试技术 API
【Elastic Engineering】Elasticsearch:Runtime fields 入门, Elastic 的 schema on read 实现 - 7.11 发布
Elasticsearch:Runtime fields 入门, Elastic 的 schema on read 实现 - 7.11 发布
211 0
【Elastic Engineering】Elasticsearch:Runtime fields 入门, Elastic 的 schema on read 实现 - 7.11 发布
|
Web App开发 存储 人工智能
|
存储 API 索引
|
索引
|
存储 Java 开发者
|
5G API 索引
|
数据可视化 API 索引
【Elastic Engineering】Elasticsearch:创建 Runtime field 并在 Kibana 中使用它 - 7.11 发布
Elasticsearch:创建 Runtime field 并在 Kibana 中使用它 - 7.11 发布
359 0
【Elastic Engineering】Elasticsearch:创建 Runtime field 并在 Kibana 中使用它 - 7.11 发布