Learn Influxdb the hard way (3) - Services in Influxdb I

简介:

前言

在上一篇文章中我们梳理了Influxdb的代码主流程结构,从本篇开始将会对Influxdb中的各种Service进行分析,由于篇幅的限制,在这篇文章中会先介绍下MonitorService。在上文中,我们知道Influxdb是通过如下的方式注册Service。

    //cmd/influxd/run/server.go 371
    s.appendMonitorService()
    s.appendPrecreatorService(s.config.Precreator)
    s.appendSnapshotterService()
    s.appendContinuousQueryService(s.config.ContinuousQuery)
    s.appendHTTPDService(s.config.HTTPD)
    s.appendStorageService(s.config.Storage)
    s.appendRetentionPolicyService(s.config.Retention)

并通过如下的代码进行依次的调用的

    //cmd/influxd/run/server.go 434
    for _, service := range s.Services {
        if err := service.Open(); err != nil {
            return fmt.Errorf("open service: %s", err)
        }
    }

那么我们就可以从此处入手,剥离每个Service。

MonitorService解析

我们先来看下MonitorService的核心代码,可以发现MonitorService的初始化更多是在确保MonitorService是运行态的单例,并且注册了一些诊断的实例,最后判断配置是否支持存储,如果支持存储,会开启goroutine定期异步将数据存储在TSM Engine中。

//influxdb/monitor/service.go 73行
// New returns a new instance of the monitor system.
func New(r Reporter, c Config) *Monitor {
    return &Monitor{
        globalTags:           make(map[string]string),
        diagRegistrations:    make(map[string]diagnostics.Client),
        reporter:             r,
        storeEnabled:         c.StoreEnabled,
        storeDatabase:        c.StoreDatabase,
        storeInterval:        time.Duration(c.StoreInterval),
        storeRetentionPolicy: MonitorRetentionPolicy,
        Logger:               zap.NewNop(),
    }
}

// open returns whether the monitor service is open.
func (m *Monitor) open() bool {
    m.mu.Lock()
    defer m.mu.Unlock()
    return m.done != nil
}

// Open opens the monitoring system, using the given clusterID, node ID, and hostname
// for identification purpose.
func (m *Monitor) Open() error {
    if m.open() {
        m.Logger.Info("Monitor is already open")
        return nil
    }

    m.Logger.Info("Starting monitor service")

    // Self-register various stats and diagnostics.
    m.RegisterDiagnosticsClient("build", &build{
        Version: m.Version,
        Commit:  m.Commit,
        Branch:  m.Branch,
        Time:    m.BuildTime,
    })
    m.RegisterDiagnosticsClient("runtime", &goRuntime{})
    m.RegisterDiagnosticsClient("network", &network{})
    m.RegisterDiagnosticsClient("system", &system{})

    m.mu.Lock()
    m.done = make(chan struct{})
    m.mu.Unlock()

    // If enabled, record stats in a InfluxDB system.
    if m.storeEnabled {
        hostname, _ := os.Hostname()
        m.SetGlobalTag("hostname", hostname)

        // Start periodic writes to system.
        m.wg.Add(1)
        go m.storeStatistics()
    }

    return nil
}

下面重点看下storeStatistics中做的事情,storeStatistics中主要是通过Statistics方法获取stats,然后再通过writePoints方法写入到相应的store中。

//influxdb/monitor/service.go 418行
func (m *Monitor) storeStatistics() {
    defer m.wg.Done()
    m.Logger.Info("Storing statistics", logger.Database(m.storeDatabase), logger.RetentionPolicy(m.storeRetentionPolicy), logger.DurationLiteral("interval", m.storeInterval))

    // Wait until an even interval to start recording monitor statistics.
    // If we are interrupted before the interval for some reason, exit early.
    if err := m.waitUntilInterval(m.storeInterval); err != nil {
        return
    }

    tick := time.NewTicker(m.storeInterval)
    defer tick.Stop()

    for {
        select {
        case now := <-tick.C:
            now = now.Truncate(m.storeInterval)
            func() {
                m.mu.Lock()
                defer m.mu.Unlock()
                m.createInternalStorage()
            }()

            stats, err := m.Statistics(m.globalTags)
            if err != nil {
                m.Logger.Info("Failed to retrieve registered statistics", zap.Error(err))
                return
            }

            // Write all stats in batches
            batch := make(models.Points, 0, 5000)
            for _, s := range stats {
                pt, err := models.NewPoint(s.Name, models.NewTags(s.Tags), s.Values, now)
                if err != nil {
                    m.Logger.Info("Dropping point", zap.String("name", s.Name), zap.Error(err))
                    return
                }
                batch = append(batch, pt)
                if len(batch) == cap(batch) {
                    m.writePoints(batch)
                    batch = batch[:0]

                }
            }

            // Write the last batch
            if len(batch) > 0 {
                m.writePoints(batch)
            }
        case <-m.done:
            m.Logger.Info("Terminating storage of statistics")
            return
        }
    }
}

比较有趣的是Statistics方法不仅返回了[]*Statistic类型的监控指标,还通过expvar将数据通过API进行暴露,Go 标准库有一个 expvar 包。 这个包可以通过 JSON 格式的 HTTP API 公开您的应用程序和 Go 运行时的指标,有点类似prometheus的API 抓取的数据格式。而MonitorService的数据来源是通过runtime包获取。重要包含内存、goroutines等相关系统指标。

    //influxdb/monitor/service.go 324行
    statistic.Values = map[string]interface{}{
        "Alloc":        int64(rt.Alloc),
        "TotalAlloc":   int64(rt.TotalAlloc),
        "Sys":          int64(rt.Sys),
        "Lookups":      int64(rt.Lookups),
        "Mallocs":      int64(rt.Mallocs),
        "Frees":        int64(rt.Frees),
        "HeapAlloc":    int64(rt.HeapAlloc),
        "HeapSys":      int64(rt.HeapSys),
        "HeapIdle":     int64(rt.HeapIdle),
        "HeapInUse":    int64(rt.HeapInuse),
        "HeapReleased": int64(rt.HeapReleased),
        "HeapObjects":  int64(rt.HeapObjects),
        "PauseTotalNs": int64(rt.PauseTotalNs),
        "NumGC":        int64(rt.NumGC),
        "NumGoroutine": int64(runtime.NumGoroutine()),
    }

我们可以看下在TSM Engine中的存储的内容
1.jpg

细心的同学可能已经发现,在_internal库中还包含了很多其他的监控数据,那么这些数据是哪里来的呢,其实在上文中,我们忽略了一个重要的对象Reportor,在实例化Monitor的时候传递一个实现了Reporter接口实例r,Reporter接口定义了一个Statistics的方法,我们可以发现QueryExecutor,TSDBStore,PointsWriter,Subscriber等核心组件都实现了Statistics方法。

// cmd/influxd/run/server.go 210行
// Statistics returns statistics for the services running in the Server.
func (s *Server) Statistics(tags map[string]string) []models.Statistic {
    var statistics []models.Statistic
    statistics = append(statistics, s.QueryExecutor.Statistics(tags)...)
    statistics = append(statistics, s.TSDBStore.Statistics(tags)...)
    statistics = append(statistics, s.PointsWriter.Statistics(tags)...)
    statistics = append(statistics, s.Subscriber.Statistics(tags)...)
    for _, srv := range s.Services {
        if m, ok := srv.(monitor.Reporter); ok {
            statistics = append(statistics, m.Statistics(tags)...)
        }
    }
    return statistics
}

在gatherStatistics方法中会调用reporter,并将相应的数据与runtime等diagnoseClient的数据进行合并,并写入到Influxdb中。

//influxdb/monitor/service.go 347行
func (m *Monitor) gatherStatistics(statistics []*Statistic, tags map[string]string) []*Statistic {
   m.mu.RLock()
   defer m.mu.RUnlock()

   if m.reporter != nil {
      for _, s := range m.reporter.Statistics(tags) {
         statistics = append(statistics, &Statistic{Statistic: s})
      }
   }
   return statistics
}

总结

到此Influxdb的监控服务部分就比较清晰了,首先监控分为系统级别的监控和组件级别的监控,系统级别的监控会通过runtime的方式获取,组件级别的监控会通过各自组件实现Reportor接口进行实现,Influxdb会定期的存储监控数据到TSM Engine的_internal库中。同样Influxdb也会通过expvar将监控数据暴露到API上。

目录
相关文章
|
存储 分布式计算 并行计算
Spark GraphX 快速入门
Spark GraphX 快速入门
499 0
Spark GraphX 快速入门
|
域名解析 网络协议 算法
谈谈《流浪地球2》中如何正确的“重启互联网”
DNS是互联网域名和IP转换,寻址调度的重要的基础设施,是互联网中少有的相对中心化的环节。《流浪地球2》中有一个重启互联网,重启(DNS)根服务器的情节,过程惊心动魄,但是从互联网专业视角看仍有一些小瑕疵。观影后笔者撰写小文,聊聊如何正确的“重启互联网”。
16506 14
谈谈《流浪地球2》中如何正确的“重启互联网”
|
算法 Java Windows
Guava-RateLimiter详解
常用的限流算法有漏桶算法和令牌桶算法,guava的RateLimiter使用的是令牌桶算法,也就是以固定的频率向桶中放入令牌,例如一秒钟10枚令牌,实际业务在每次响应请求之前都从桶中获取令牌,只有取到令牌的请求才会被成功响应,获取的方式有两种:阻塞等待令牌或者取不到立即返回失败,下图来自网上: ratelimite原理图 本次实战,我们用的是guava的RateLimiter,场景是spring mvc在处理请求时候,从桶中申请令牌,申请到了就成功响应,申请不到时直接返回失败。
3458 0
系统明明有很多内存,却无法分配出一片大块内存?
系统明明有很多内存,却无法分配出一片大块内存?
|
存储 自然语言处理 容灾
即时通讯技术文集(第19期):IM架构设计基础知识合集 [共13篇]
为了更好地分类阅读 52im.net 总计1000多篇精编文章,我将在每周三推送新的一期技术文集,本次是第19 期。
275 0
|
存储 消息中间件 传感器
物联网平台(Link Platform)介绍|学习笔记
快速学习物联网平台(Link Platform)介绍
1151 15
物联网平台(Link Platform)介绍|学习笔记
|
前端开发 数据库 UED
无感token实现方案
无感token实现方案
|
传感器 消息中间件 网络协议
Paho MQTT 客户端接入阿里云物联网平台(2)| 学习笔记
快速学习 Paho MQTT 客户端接入阿里云物联网平台(2)
1370 1
Paho MQTT 客户端接入阿里云物联网平台(2)| 学习笔记
|
API 数据格式 JSON
宜搭应用与第三方系统数据互通怎么做?
使用 HTTP 连接器向宜搭的 API 发送请求。具体来说,您可以使用集成自动化中的 HTTP 连接器,设置其为定时触发,并将编写的脚本中转换后的数据作为 HTTP 请求的 body,发送 POST 请求到宜搭的 API 接口。
宜搭应用与第三方系统数据互通怎么做?
|
存储 安全 搜索推荐
什么是PDS
简述什么是PDS以及基于PDS搭建的企业网盘的体验感受