Learn Influxdb the hard way (2) - Dive into the code backbone

本文涉及的产品
容器镜像服务 ACR,镜像仓库100个 不限时长
简介:

前言

在上一篇文章我们从上帝视角鸟瞰了一下Influxdb的组件结构,在这篇文章中,我们会开始深入代码,从代码的流程中帮助大家切割Influxdb的核心组件。希望在看本篇文章的时候,大家能够将Influxdb的代码下载的本地,对照进行查看。

git clone -b v1.5.0 git@github.com:influxdata/influxdb.git

代码主干流程

Influxdb的源码仓库中包含了influx、influx_inspect、influx_stress、influx_tsm、influxd、store等多个子项目,对于本系列而言,更多的侧重在Influxd也就是Influxdb的主要存储Server。

首先我们进入到cmd/influxd/main.go的入口文件,进行代码跟踪,进入到run命令的代码分支流程下。

    //cmd/influxd/run/command.go 133行
    
    s, err := NewServer(config, buildInfo)
    if err != nil {
        return fmt.Errorf("create server: %s", err)
    }
    s.Logger = cmd.Logger
    s.CPUProfile = options.CPUProfile
    s.MemProfile = options.MemProfile
    if err := s.Open(); err != nil {
        return fmt.Errorf("open server: %s", err)
    }
    cmd.Server = s

    // Begin monitoring the server's error channel.
    go cmd.monitorServerErrors()

可以看到Influxdb中Server的构建,并最终调用了Server的Open方法启动Server,这个Server对象是Influxdb的逻辑封装。我们来看下Server包含的内容。

    //cmd/influxd/run/server.go 158行
    
    s.Monitor = monitor.New(s, c.Monitor)
    s.config.registerDiagnostics(s.Monitor)

    if err := s.MetaClient.Open(); err != nil {
        return nil, err
    }

    s.TSDBStore = tsdb.NewStore(c.Data.Dir)
    s.TSDBStore.EngineOptions.Config = c.Data

    // Copy TSDB configuration.
    s.TSDBStore.EngineOptions.EngineVersion = c.Data.Engine
    s.TSDBStore.EngineOptions.IndexVersion = c.Data.Index

    // Create the Subscriber service
    s.Subscriber = subscriber.NewService(c.Subscriber)

    // Initialize points writer.
    s.PointsWriter = coordinator.NewPointsWriter()
    s.PointsWriter.WriteTimeout = time.Duration(c.Coordinator.WriteTimeout)
    s.PointsWriter.TSDBStore = s.TSDBStore

    // Initialize query executor.
    s.QueryExecutor = query.NewQueryExecutor()
    s.QueryExecutor.StatementExecutor = &coordinator.StatementExecutor{
        MetaClient:  s.MetaClient,
        TaskManager: s.QueryExecutor.TaskManager,
        TSDBStore:   coordinator.LocalTSDBStore{Store: s.TSDBStore},
        ShardMapper: &coordinator.LocalShardMapper{
            MetaClient: s.MetaClient,
            TSDBStore:  coordinator.LocalTSDBStore{Store: s.TSDBStore},
        },
        Monitor:           s.Monitor,
        PointsWriter:      s.PointsWriter,
        MaxSelectPointN:   c.Coordinator.MaxSelectPointN,
        MaxSelectSeriesN:  c.Coordinator.MaxSelectSeriesN,
        MaxSelectBucketsN: c.Coordinator.MaxSelectBucketsN,
    }
    s.QueryExecutor.TaskManager.QueryTimeout = time.Duration(c.Coordinator.QueryTimeout)
    s.QueryExecutor.TaskManager.LogQueriesAfter = time.Duration(c.Coordinator.LogQueriesAfter)
    s.QueryExecutor.TaskManager.MaxConcurrentQueries = c.Coordinator.MaxConcurrentQueries

    // Initialize the monitor
    s.Monitor.Version = s.buildInfo.Version
    s.Monitor.Commit = s.buildInfo.Commit
    s.Monitor.Branch = s.buildInfo.Branch
    s.Monitor.BuildTime = s.buildInfo.Time
    s.Monitor.PointsWriter = (*monitorPointsWriter)(s.PointsWriter)

Server的核心代码主要就是实例化内部组件,主要包含Monitor、Subscriber、PointsWriter和QueryExecutor。在上篇文章中已经简单的介绍过这几个组件的作用,在此先不过多赘述,我们再来看下Server的Open方法。

    //cmd/influxd/run/server.go 371行  
    
    s.appendMonitorService()
    s.appendPrecreatorService(s.config.Precreator)
    s.appendSnapshotterService()
    s.appendContinuousQueryService(s.config.ContinuousQuery)
    s.appendHTTPDService(s.config.HTTPD)
    s.appendStorageService(s.config.Storage)
    s.appendRetentionPolicyService(s.config.Retention)
    for _, i := range s.config.GraphiteInputs {
        if err := s.appendGraphiteService(i); err != nil {
            return err
        }
    }
    for _, i := range s.config.CollectdInputs {
        s.appendCollectdService(i)
    }
    for _, i := range s.config.OpenTSDBInputs {
        if err := s.appendOpenTSDBService(i); err != nil {
            return err
        }
    }
    for _, i := range s.config.UDPInputs {
        s.appendUDPService(i)
    }

    s.Subscriber.MetaClient = s.MetaClient
    s.PointsWriter.MetaClient = s.MetaClient
    s.Monitor.MetaClient = s.MetaClient

    s.SnapshotterService.Listener = mux.Listen(snapshotter.MuxHeader)

    // Configure logging for all services and clients.
    if s.config.Meta.LoggingEnabled {
        s.MetaClient.WithLogger(s.Logger)
    }
    s.TSDBStore.WithLogger(s.Logger)
    if s.config.Data.QueryLogEnabled {
        s.QueryExecutor.WithLogger(s.Logger)
    }
    s.PointsWriter.WithLogger(s.Logger)
    s.Subscriber.WithLogger(s.Logger)
    for _, svc := range s.Services {
        svc.WithLogger(s.Logger)
    }
    s.SnapshotterService.WithLogger(s.Logger)
    s.Monitor.WithLogger(s.Logger)

    // Open TSDB store.
    if err := s.TSDBStore.Open(); err != nil {
        return fmt.Errorf("open tsdb store: %s", err)
    }

    // Open the subcriber service
    if err := s.Subscriber.Open(); err != nil {
        return fmt.Errorf("open subscriber: %s", err)
    }

    // Open the points writer service
    if err := s.PointsWriter.Open(); err != nil {
        return fmt.Errorf("open points writer: %s", err)
    }

    s.PointsWriter.AddWriteSubscriber(s.Subscriber.Points())

    for _, service := range s.Services {
        if err := service.Open(); err != nil {
            return fmt.Errorf("open service: %s", err)
        }
    }

在Server的Open中先进行了Service的注册,然后将实例化的内部组件进行启动并设置相应的Logger等配置,最后依次启动注册在Server上的服务。

我们可以挑选appendPrecreatorService作为范例进行分析,来看下一个Service的注册过程。

//cmd/influxd/run/server.go 320行  

func (s *Server) appendPrecreatorService(c precreator.Config) error {
    if !c.Enabled {
        return nil
    }
    srv := precreator.NewService(c)
    srv.MetaClient = s.MetaClient
    s.Services = append(s.Services, srv)
    return nil
}

在本例中讲解的是PrecreatorService这个Service的注册过程,首先实例化一个PrecreatorService,然后设置其使用的MetaClient,最后将PrecreatorService的实例注册到Server上,在Influxdb中Service是一个接口类型,需要实现如下三个方法,分别负责统一的日志接入,启动和停止。

//cmd/influxd/run/server.go 558行

// Service represents a service attached to the server.
type Service interface {
    WithLogger(log *zap.Logger)
    Open() error
    Close() error
}

这样Influxdb的代码基本已经切割清晰了,首先是一个Server负责实例化几个全局的内部组件单例,然后生成上层业务包装的Service,最后再依次启动。

最后

在接下来的文章中,我们会深入到每个Service中,依次讲解他们的功能与原理。如果有错误的地方麻烦大家多多指正。

目录
相关文章
|
7月前
|
机器学习/深度学习 自然语言处理 算法
【论文精读】ACL 2022:Graph Pre-training for AMR Parsing and Generation
【论文精读】ACL 2022:Graph Pre-training for AMR Parsing and Generation
|
16天前
|
算法 数据挖掘 测试技术
文献解读-Processing UMI Datasets at High Accuracy and Efficiency with the Sentieon ctDNA Analysis Pipeline
Sentieon ctDNA分析流程通过创新的算法设计和高效的软件实现,为高深度、大panel的ctDNA测序数据分析提了一个快速而准确的解决方案。它在多个数据集上均展现出优于或等同于现有方法的性能,同时大幅提高了处理速度。这一进展有望推动ctDNA技术在临床肿瘤学中的广泛应用,特别是在早期癌症检测和最小残留病监测等领域。
37 8
|
2月前
|
算法 数据挖掘 数据处理
文献解读-Sentieon DNAscope LongRead – A highly Accurate, Fast, and Efficient Pipeline for Germline Variant Calling from PacBio HiFi reads
PacBio® HiFi 测序是第一种提供经济、高精度长读数测序的技术,其平均读数长度超过 10kb,平均碱基准确率达到 99.8% 。在该研究中,研究者介绍了一种准确、高效的 DNAscope LongRead 管道,用于从 PacBio® HiFi 读数中调用胚系变异。DNAscope LongRead 是对 Sentieon 的 DNAscope 工具的修改和扩展,该工具曾获美国食品药品管理局(FDA)精密变异调用奖。
30 2
文献解读-Sentieon DNAscope LongRead – A highly Accurate, Fast, and Efficient Pipeline for Germline Variant Calling from PacBio HiFi reads
|
4月前
|
机器学习/深度学习 存储 算法
【博士每天一篇文献-算法】Memory augmented echo state network for time series prediction
本文介绍了一种记忆增强的回声状态网络(MA-ESN),它通过在储层中引入线性记忆模块和非线性映射模块来平衡ESN的记忆能力和非线性映射能力,提高了时间序列预测的性能,并在多个基准数据集上展示了其优越的记忆能力和预测精度。
34 3
【博士每天一篇文献-算法】Memory augmented echo state network for time series prediction
|
4月前
|
机器学习/深度学习 算法 物联网
【博士每天一篇论文-算法】Overview of Echo State Networks using Different Reservoirs and Activation Functions
本文研究了在物联网网络中应用回声状态网络(ESN)进行交通预测的不同拓扑结构,通过与SARIMA、CNN和LSTM等传统算法的比较,发现特定配置的ESN在数据速率和数据包速率预测方面表现更佳,证明了ESN在网络流量预测中的有效性。
37 4
|
算法 Linux Shell
SGAT丨Single Gene Analysis Tool
SGAT丨Single Gene Analysis Tool
|
机器学习/深度学习 自然语言处理 算法
SS-AGA:Multilingual Knowledge Graph Completion with Self-Supervised Adaptive Graph Alignment 论文解读
预测知识图(KG)中缺失的事实是至关重要的,因为现代知识图远未补全。由于劳动密集型的人类标签,当处理以各种语言表示的知识时,这种现象会恶化。
112 0
|
机器学习/深度学习 存储 自然语言处理
Bi-SimCut: A Simple Strategy for Boosting Neural Machine Translation 论文笔记
Bi-SimCut: A Simple Strategy for Boosting Neural Machine Translation 论文笔记
sbs
|
存储 SQL 人工智能
The Volcano Optimizer Generator: Extensibility and Efficient Search 论文翻译
原文:The Volcano Optimizer Generator: Extensibility and Efficient SearchThe Volcano Optimizer Generator: Extensibility and Efficient Search 论文翻译。2023.01.25 —— by zz【中括号内为译者注】对原文部分关键术语,或重点句有加粗。便于定位。为了避免英
sbs
267 0
The Volcano Optimizer Generator: Extensibility and Efficient Search 论文翻译
|
机器学习/深度学习 算法 数据挖掘
Re18:读论文 GCI Everything Has a Cause: Leveraging Causal Inference in Legal Text Analysis
Re18:读论文 GCI Everything Has a Cause: Leveraging Causal Inference in Legal Text Analysis
Re18:读论文 GCI Everything Has a Cause: Leveraging Causal Inference in Legal Text Analysis