流量回放工具之GoReplay output-http 源码分析

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 【6月更文挑战5天】流量回放工具之GoReplay output-http 源码分析

前言

GoReplay 对数据流的抽象出了两个概念,即用 输入(input )输出(output ) 来表示数据来源与去向,统称为 plugin,用介于输入和输出模块之间的中间件实现拓展机制。

output_http.go:主要是 HTTP 输出的插件,实现 HTTP 协议, 实现 io.Writer 接口,最后根据配置注册到 Plugin.outputs 队列里。

参数说明

-output-http value  //转发进入的请求到一个http地址上
        Forwards incoming requests to given http address.
                # Redirect all incoming requests to staging.com address 
                gor --input-raw :80 --output-http http://staging.com
  -output-http-elasticsearch string  //把请求和响应状态发送到 ElasticSearch
        Send request and response stats to ElasticSearch:
                gor --input-raw :8080 --output-http staging.com --output-http-elasticsearch 'es_host:api_port/index_name'
  -output-http-queue-len int //http输出队列大小
        Number of requests that can be queued for output, if all workers are busy. default = 1000 (default 1000)
  -output-http-redirects int  // 设置多少次重定向被允许,默认忽略
        Enable how often redirects should be followed.
  -output-http-response-buffer value  //最大接收响应大小(缓冲区)
        HTTP response buffer size, all data after this size will be discarded.
  -output-http-skip-verify
        Don't verify hostname on TLS secure connection.
  -output-http-stats  //每5秒钟输出一次输出队列的状态
        Report http output queue stats to console every N milliseconds. See output-http-stats-ms
  -output-http-stats-ms int
        Report http output queue stats to console every N milliseconds. default: 5000 (default 5000)
  -output-http-timeout duration  //指定 http 的 request/response 超时时间,默认是 5 秒 
        Specify HTTP request/response timeout. By default 5s. Example: --output-http-timeout 30s (default 5s)
  -output-http-track-response
        If turned on, HTTP output responses will be set to all outputs like stdout, file and etc.
  -output-http-worker-timeout duration
        Duration to rollback idle workers. (default 2s)
  -output-http-workers int  //gor默认是动态的扩展工作者数量,你也可以指定固定数量的工作者
        Gor uses dynamic worker scaling. Enter a number to set a maximum number of workers. default = 0 = unlimited.
  -output-http-workers-min int
        Gor uses dynamic worker scaling. Enter a number to set a minimum number of workers. default = 1.

默认情况下,Gor 创建一个动态工作池:
它从 10 开始,并在 HTTP 输出队列长度大于 10 时创建更多的 HTTP 输出协程。创建的协程数量(N)等于该工作时间的队列长度检查并发现其长度大于10.每次将消息写入 HTTP 输出队列时都检查队列长度。在产生 N 个协程的请求得到满足之前,不会再有协程创建。如果动态协程池当时不能处理消息,它将睡眠 100 毫秒。如果动态工作协程无法处理消息2秒钟,则会死亡。可以使用 --output-http-workers=20 选项指定固定数量的协程。

HTTP 输出工作数量

NewHTTPOutput 默认情况:
image.png

// NewHTTPOutput constructor for HTTPOutput
// Initialize workers
func NewHTTPOutput(address string, config *HTTPOutputConfig) PluginReadWriter {
   
   
    o := new(HTTPOutput)
    var err error
    config.url, err = url.Parse(address)
    if err != nil {
   
   
        log.Fatal(fmt.Sprintf("[OUTPUT-HTTP] parse HTTP output URL error[%q]", err))
    }
    if config.url.Scheme == "" {
   
   
        config.url.Scheme = "http"
    }
    config.rawURL = config.url.String()
    if config.Timeout < time.Millisecond*100 {
   
   
        config.Timeout = time.Second
    }
    if config.BufferSize <= 0 {
   
   
        config.BufferSize = 100 * 1024 // 100kb
    }
    if config.WorkersMin <= 0 {
   
   
        config.WorkersMin = 1
    }
    if config.WorkersMin > 1000 {
   
   
        config.WorkersMin = 1000
    }
    if config.WorkersMax <= 0 {
   
   
        config.WorkersMax = math.MaxInt32 // idealy so large
    }
    if config.WorkersMax < config.WorkersMin {
   
   
        config.WorkersMax = config.WorkersMin
    }
    if config.QueueLen <= 0 {
   
   
        config.QueueLen = 1000
    }
    if config.RedirectLimit < 0 {
   
   
        config.RedirectLimit = 0
    }
    if config.WorkerTimeout <= 0 {
   
   
        config.WorkerTimeout = time.Second * 2
    }
    o.config = config
    o.stop = make(chan bool)
    //是否收集统计信息,统计输出间隔是多少
    if o.config.Stats {
   
   
        o.queueStats = NewGorStat("output_http", o.config.StatsMs)
    }

    o.queue = make(chan *Message, o.config.QueueLen)
    if o.config.TrackResponses {
   
   
        o.responses = make(chan *response, o.config.QueueLen)
    }
    // it should not be buffered to avoid races
    o.stopWorker = make(chan struct{
   
   })

    if o.config.ElasticSearch != "" {
   
   
        o.elasticSearch = new(ESPlugin)
        o.elasticSearch.Init(o.config.ElasticSearch)
    }
    o.client = NewHTTPClient(o.config)
    o.activeWorkers += int32(o.config.WorkersMin)
    for i := 0; i < o.config.WorkersMin; i++ {
   
   
        go o.startWorker()
    }
    go o.workerMaster()
    return o
}

配置后启动 httpclient:
image.png

o.client = NewHTTPClient(o.config)
    o.activeWorkers += int32(o.config.WorkersMin)
    for i := 0; i < o.config.WorkersMin; i++ {
   
   
        go o.startWorker()
    }

启动多个发送协程:
image.png

func (o *HTTPOutput) startWorker() {
   
   
    for {
   
   
        select {
   
   
        case <-o.stopWorker:
            return
        case msg := <-o.queue:
            o.sendRequest(o.client, msg)
        }
    }
}

执行发送:
image.png

func (o *HTTPOutput) sendRequest(client *HTTPClient, msg *Message) {
   
   
    if !isRequestPayload(msg.Meta) {
   
   
        return
    }

    uuid := payloadID(msg.Meta)
    start := time.Now()
    resp, err := client.Send(msg.Data)
    stop := time.Now()

    if err != nil {
   
   
        Debug(1, fmt.Sprintf("[HTTP-OUTPUT] error when sending: %q", err))
        return
    }
    if resp == nil {
   
   
        return
    }

    if o.config.TrackResponses {
   
   
        o.responses <- &response{
   
   resp, uuid, start.UnixNano(), stop.UnixNano() - start.UnixNano()}
    }

    if o.elasticSearch != nil {
   
   
        o.elasticSearch.ResponseAnalyze(msg.Data, resp, start, stop)
    }
}

发送细节,各种配置生效点:
image.png

// Send sends an http request using client create by NewHTTPClient
func (c *HTTPClient) Send(data []byte) ([]byte, error) {
   
   
    var req *http.Request
    var resp *http.Response
    var err error

    req, err = http.ReadRequest(bufio.NewReader(bytes.NewReader(data)))
    if err != nil {
   
   
        return nil, err
    }
    // we don't send CONNECT or OPTIONS request
    if req.Method == http.MethodConnect {
   
   
        return nil, nil
    }

    if !c.config.OriginalHost {
   
   
        req.Host = c.config.url.Host
    }

    // fix #862
    if c.config.url.Path == "" && c.config.url.RawQuery == "" {
   
   
        req.URL.Scheme = c.config.url.Scheme
        req.URL.Host = c.config.url.Host
    } else {
   
   
        req.URL = c.config.url
    }

    // force connection to not be closed, which can affect the global client
    req.Close = false
    // it's an error if this is not equal to empty string
    req.RequestURI = ""

    resp, err = c.Client.Do(req)
    if err != nil {
   
   
        return nil, err
    }
    if c.config.TrackResponses {
   
   
        return httputil.DumpResponse(resp, true)
    }
    _ = resp.Body.Close()
    return nil, nil

HTTP 输出队列

image.png

队列用在哪儿呢?
image.png

代码逻辑调用图

image.png

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
相关文章
|
8月前
|
编解码 测试技术 索引
性能工具之 Jmeter 使用 HTTP 请求编写 HLS 脚本
在我们简要介绍了 HLS 协议的基础知识,接下来我们详细介绍一种使用 Jmeter 编写压测 HLS 协议脚本的方法。
159 1
性能工具之 Jmeter 使用 HTTP 请求编写 HLS 脚本
|
8月前
|
网络协议 Linux 网络安全
curl(http命令行工具):Linux下最强大的网络数据传输工具
curl(http命令行工具):Linux下最强大的网络数据传输工具
238 0
|
7月前
|
运维 监控 Serverless
函数计算产品使用问题之HTTP触发器被恶意刷流量,该怎么办
函数计算产品作为一种事件驱动的全托管计算服务,让用户能够专注于业务逻辑的编写,而无需关心底层服务器的管理与运维。你可以有效地利用函数计算产品来支撑各类应用场景,从简单的数据处理到复杂的业务逻辑,实现快速、高效、低成本的云上部署与运维。以下是一些关于使用函数计算产品的合集和要点,帮助你更好地理解和应用这一服务。
|
4月前
|
监控 网络协议 应用服务中间件
【Tomcat源码分析】从零开始理解 HTTP 请求处理 (第一篇)
本文详细解析了Tomcat架构中复杂的`Connector`组件。作为客户端与服务器间沟通的桥梁,`Connector`负责接收请求、封装为`Request`和`Response`对象,并传递给`Container`处理。文章通过四个关键问题逐步剖析了`Connector`的工作原理,并深入探讨了其构造方法、`init()`与`start()`方法。通过分析`ProtocolHandler`、`Endpoint`等核心组件,揭示了`Connector`初始化及启动的全过程。本文适合希望深入了解Tomcat内部机制的读者。欢迎关注并点赞,持续更新中。如有问题,可搜索【码上遇见你】交流。
【Tomcat源码分析】从零开始理解 HTTP 请求处理 (第一篇)
|
7月前
|
网络协议 PHP
Swoole 源码分析之 Http Server 模块
想要了解到 `Http Server` 的全貌,其实只要把那张整体的实现图看懂就足以了。但是,如果想要有足够的深度,那么就还需要深入 `Swoole` 的源代码中,就着源码自行分析一遍。同时,也希望这一次的分析,能够给大家带来对 `Swoole` 更多的一些了解。并不要求要深刻的掌握,因为,很多的事情都不可能一蹴而就。从自己的实力出发,勿忘初心。
93 0
Swoole 源码分析之 Http Server 模块
|
6月前
|
SQL
常用工具类---SQL工具,HTTP工具
SQL工具,HTTP工具,两个实用小工具~~~
|
7月前
流量回放工具之 GoReplay output-http-stats(HTTP请求统计) 源码分析
【6月更文挑战4天】流量回放工具之 GoReplay output-http-stats(HTTP请求统计) 源码分析
94 4
|
7月前
|
数据采集 Java API
Java HTTP客户端工具的演变之路
Java HTTP客户端工具的演变之路
|
8月前
|
JSON API 定位技术
.NET集成DeveloperSharp实现http网络请求&与其它工具的比较
该内容介绍了一个支持.NET Core 2.0及以上和.NET Framework 4.0及以上的HTTP请求调用方法,主要讨论了POST和GET两种形式。POST请求较为常见,涉及调用地址、发送参数、HTTP请求头和编码格式设置。文中提供了一个使用DeveloperSharp库发送POST请求的C#代码示例,用于发送短信,其中`IU.HttpPost`方法用于执行POST请求。此外,还提到了`HttpPost`方法的参数和返回值说明。最后简要提及了GET请求,通常用于URL带有查询参数的情况,并给出一个简单的GET请求示例。
|
8月前
|
JSON 应用服务中间件 API
同一端口同一方法提供grpc和http流量支持
以上方法可以让你在同一端口上同时支持gRPC和HTTP流量。具体的选择取决于你的项目需求和技术架构。 买CN2云服务器,免备案服务器,高防服务器,就选蓝易云。百度搜索:蓝易云
141 0