流量回放工具之 goreplay 核心源码分析

简介: 【6月更文挑战第3天】流量回放工具之 goreplay 核心源码分析

一、前言

goreplay 前称是 gor,一个简单的 TCP/HTTP 流量录制及重放的工具,主要用 Go 语言编写。

Github地址:https://github.com/buger/goreplay

在上篇文章中我已经介绍了初级入门和基本使用了。

二、工程结构

这里以最新的 v1.3 版本为例,与 v1.0 的代码存在较大差异。

 ~/GoProjects/gor_org/goreplay   release-1.3 ±✚  tree -L 1

.
├── COMM-LICENSE
├── Dockerfile
├── Dockerfile.dev
├── ELASTICSEARCH.md
├── LICENSE.txt
├── Makefile
├── Procfile
├── README.md
├── byteutils
├── capture
├── circle.yml
├── docs
├── elasticsearch.go
├── emitter.go 
├── emitter_test.go
├── examples
├── go.mod
├── go.sum
├── gor.go  
├── gor_stat.go
├── homebrew
├── http_modifier.go
├── http_modifier_settings.go
├── http_modifier_settings_test.go
├── http_modifier_test.go
├── http_prettifier.go
├── http_prettifier_test.go
├── input_dummy.go
├── input_file.go
├── input_file_test.go
├── input_http.go
├── input_http_test.go
├── input_kafka.go
├── input_kafka_test.go
├── input_raw.go
├── input_raw_test.go
├── input_tcp.go
├── input_tcp_test.go
├── kafka.go
├── limiter.go
├── limiter_test.go
├── middleware
├── middleware.go
├── middleware_test.go
├── mkdocs.yml
├── output_binary.go 
├── output_dummy.go
├── output_file.go
├── output_file_test.go
├── output_http.go
├── output_http_test.go
├── output_kafka.go
├── output_kafka_test.go
├── output_null.go
├── output_s3.go
├── output_tcp.go
├── output_tcp_test.go
├── plugins.go     
├── plugins_test.go
├── pro.go
├── proto
├── protocol.go
├── ring
├── s3
├── s3_reader.go
├── s3_test.go
├── settings.go  
├── settings_test.go
├── sidenav.css
├── simpletime
├── site
├── size
├── snapcraft.yaml
├── tcp
├── tcp_client.go
├── test_input.go
├── test_output.go
├── vendor
└── version.go

工程目录比较扁平,主要看 plugin.gosettings.goemitter.go 几个主要文件,其它分 input_xxxoutput_xxx 都是适配具体协议的输入输出插件,程序入口是 gor.go 的 main 函数。

主要文件说明:

  • settings.go:实现对于启动命令参数的解析,决定其注册那些插件到 Plugin.InputsPlugin.Outputs两个列表里。
  • plugin.go:主要是所有输入输出插件的管理。
  • emitter.go:程序核心事件处理,实现对于 Plugin.Inputs 输入流的读取、判断是否需要进行 middlewear 的处理、http修改等,然后异步复制流量到所有 Plugin.outputs,同时将所有 Plugin.outputs 中有 response 的数据,复制到所有 outputs 中。
  • input_xxx.go:主要是输入的插件,实现 tcp/http/raw/kafka等协议, 实现 io.Reader 接口,最后根据配置注册到 Plugin.inputs队列里。
  • output_xxx.go:主要是输出的插件,实现 tcp/http/raw/kafka 等协议, 实现 io.Writer 接口,最后根据配置注册到 Plugin.outputs 队列里。

三、主要核心流程

image.png

goreplay 只有 input 和 output 两个概念,是 goreplay 对数据流的抽象,统称为 plugin。

gor.go 中 main 函数,它主要做了以下事情:

image.png

解析命令行参数:

// Parse parses the command-line flags from os.Args[1:]. Must be called
// after all flags are defined and before flags are accessed by the program.
func Parse() {
   
    // Ignore errors; CommandLine is set for ExitOnError.
    CommandLine.Parse(os.Args[1:])
}

image.png

初始化全局的 Settings 变量。

func checkSettings() {
   
    if Settings.OutputFileConfig.SizeLimit < 1 {
   
        Settings.OutputFileConfig.SizeLimit.Set("32mb")
    }
    if Settings.OutputFileConfig.OutputFileMaxSize < 1 {
   
        Settings.OutputFileConfig.OutputFileMaxSize.Set("1tb")
    }
    if Settings.CopyBufferSize < 1 {
   
        Settings.CopyBufferSize.Set("5mb")
    }
}

命令行参数的定义在 settings.go 的 init 函数中,会先于 main 函数执行。

func init() {
   
    flag.Usage = usage
    flag.StringVar(&Settings.Pprof, "http-pprof", "", "Enable profiling. Starts  http server on specified port, exposing special /debug/pprof endpoint. Example: `:8181`")
    flag.IntVar(&Settings.Verbose, "verbose", 0, "set the level of verbosity, if greater than zero then it will turn on debug output")
    flag.BoolVar(&Settings.Stats, "stats", false, "Turn on queue stats output")

    if DEMO == "" {
   
        flag.DurationVar(&Settings.ExitAfter, "exit-after", 0, "exit after specified duration")
    } else {
   
        Settings.ExitAfter = 5 * time.Minute
    }

    flag.BoolVar(&Settings.SplitOutput, "split-output", false, "By default each output gets same traffic. If set to `true` it splits traffic equally among all outputs.")
    flag.BoolVar(&Settings.RecognizeTCPSessions, "recognize-tcp-sessions", false, "[PRO] If turned on http output will create separate worker for each TCP session. Splitting output will session based as well.")

    ......

    // default values, using for tests
    Settings.OutputFileConfig.SizeLimit = 33554432
    Settings.OutputFileConfig.OutputFileMaxSize = 1099511627776
    Settings.CopyBufferSize = 5242880

}

image.png

根据命令行传参初始化插件,在 main 函数中调用 InitPlugins 函数。

// NewPlugins specify and initialize all available plugins
func NewPlugins() *InOutPlugins {
   
    plugins := new(InOutPlugins)

    for _, options := range Settings.InputDummy {
   
        plugins.registerPlugin(NewDummyInput, options)
    }

    ......

    return plugins
}

调用 Start 函数,启动 emitter,每个 input 插件,都启动一个协程,读取 input,写 output。​

/ Start initialize loop for sending data from inputs to outputs
func (e *Emitter) Start(plugins *InOutPlugins, middlewareCmd string) {
   
    if Settings.CopyBufferSize < 1 {
   
        Settings.CopyBufferSize = 5 << 20
    }
    e.plugins = plugins

    if middlewareCmd != "" {
   
        middleware := NewMiddleware(middlewareCmd)

        for _, in := range plugins.Inputs {
   
            middleware.ReadFrom(in)
        }

        e.plugins.Inputs = append(e.plugins.Inputs, middleware)
        e.plugins.All = append(e.plugins.All, middleware)
        e.Add(1)
        go func() {
   
            defer e.Done()
            if err := CopyMulty(middleware, plugins.Outputs...); err != nil {
   
                Debug(2, fmt.Sprintf("[EMITTER] error during copy: %q", err))
            }
        }()
    } else {
   
        for _, in := range plugins.Inputs {
   
            e.Add(1)
            go func(in PluginReader) {
   
                defer e.Done()
                if err := CopyMulty(in, plugins.Outputs...); err != nil {
   
                    Debug(2, fmt.Sprintf("[EMITTER] error during copy: %q", err))
                }
            }(in)
        }
    }
}

如果只有一个协程,存在性能瓶颈。默认是一个 input 复制多份,写多个 output,如果传了 --split-output 参数,并且有多个 output ,则使用简单的 Round Robin 算法来选 output,不会写多份。多个 input 之间是并行的,但单个 input 到多个 output,是串行的。所有 input 都实现了 io.Reader 接口,output 都实现了 io.Writer 接口。所以阅读代码时,input 的入口是 Read() 方法,output 的入口是 Write() 方法。


// CopyMulty copies from 1 reader to multiple writers
func CopyMulty(src PluginReader, writers ...PluginWriter) error {
   
    wIndex := 0
    modifier := NewHTTPModifier(&Settings.ModifierConfig)
    filteredRequests := make(map[string]int64)
    filteredRequestsLastCleanTime := time.Now().UnixNano()
    filteredCount := 0

    for {
   
        msg, err := src.PluginRead()
        if err != nil {
   
            if err == ErrorStopped || err == io.EOF {
   
                return nil
            }
            return err
        }
        if msg != nil && len(msg.Data) > 0 {
   
            if len(msg.Data) > int(Settings.CopyBufferSize) {
   
                msg.Data = msg.Data[:Settings.CopyBufferSize]
            }
            meta := payloadMeta(msg.Meta)
            if len(meta) < 3 {
   
                Debug(2, fmt.Sprintf("[EMITTER] Found malformed record %q from %q", msg.Meta, src))
                continue
            }
            requestID := byteutils.SliceToString(meta[1])
            // start a subroutine only when necessary
            if Settings.Verbose >= 3 {
   
                Debug(3, "[EMITTER] input: ", byteutils.SliceToString(msg.Meta[:len(msg.Meta)-1]), " from: ", src)
            }
            if modifier != nil {
   
                Debug(3, "[EMITTER] modifier:", requestID, "from:", src)
                if isRequestPayload(msg.Meta) {
   
                    msg.Data = modifier.Rewrite(msg.Data)
                    // If modifier tells to skip request
                    if len(msg.Data) == 0 {
   
                        filteredRequests[requestID] = time.Now().UnixNano()
                        filteredCount++
                        continue
                    }
                    Debug(3, "[EMITTER] Rewritten input:", requestID, "from:", src)

                } else {
   
                    if _, ok := filteredRequests[requestID]; ok {
   
                        delete(filteredRequests, requestID)
                        filteredCount--
                        continue
                    }
                }
            }

            if Settings.PrettifyHTTP {
   
                msg.Data = prettifyHTTP(msg.Data)
                if len(msg.Data) == 0 {
   
                    continue
                }
            }

            if Settings.SplitOutput {
   
                if Settings.RecognizeTCPSessions {
   
                    if !PRO {
   
                        log.Fatal("Detailed TCP sessions work only with PRO license")
                    }
                    hasher := fnv.New32a()
                    hasher.Write(meta[1])

                    wIndex = int(hasher.Sum32()) % len(writers)
                    if _, err := writers[wIndex].PluginWrite(msg); err != nil {
   
                        return err
                    }
                } else {
   
                    // Simple round robin
                    if _, err := writers[wIndex].PluginWrite(msg); err != nil {
   
                        return err
                    }

                    wIndex = (wIndex + 1) % len(writers)
                }
            } else {
   
                for _, dst := range writers {
   
                    if _, err := dst.PluginWrite(msg); err != nil && err != io.ErrClosedPipe {
   
                        return err
                    }
                }
            }
        }

        // Run GC on each 1000 request
        if filteredCount > 0 && filteredCount%1000 == 0 {
   
            // Clean up filtered requests for which we didn't get a response to filter
            now := time.Now().UnixNano()
            if now-filteredRequestsLastCleanTime > int64(60*time.Second) {
   
                for k, v := range filteredRequests {
   
                    if now-v > int64(60*time.Second) {
   
                        delete(filteredRequests, k)
                        filteredCount--
                    }
                }
                filteredRequestsLastCleanTime = time.Now().UnixNano()
            }
        }
    }
}

轮询调度算法的原理是每一次把来自用户的请求轮流分配给内部中的服务器,从1开始,直到 N(内部服务器个数),然后重新开始循环。
算法的优点是其简洁性,它无需记录当前所有连接的状态,所以它是一种无状态调度。

image.png

四、其它的小知识

goreplay 抓包调用 google/gopacket 来实现,后者通过 cgo 来调用 libpcap。整体工具小巧而实用,既可以实现 rawsocket 的抓包,也可以实现 http 的录制、回放,也支持多实例之间的级联。RAW_SOCKET 允许监听任何端口上的流量,因为它们是在IP级别上操作的。端口是 TCP 的特性,具有流量控制、传输可靠等优点。这个包实现了自己的TCP层: 使用tcp_packet 解析TCP包。流控制由 tcp_message.go管理

参考地址:http://en.wikipedia.org/wiki/Raw_socket

image.png

用三个猴头 🐵🙈🙉 emoji 字符作为请求分隔符,第一眼看到感觉挺搞笑的。


比如:
image.png

配置信息全靠启动命令参数。

比如:

/usr/local/bin/gor --input-raw :80 --input-raw-track-response   --input-raw-bpf-filter "host ! 167.xxx.xxx.xx"  --input-raw-override-snaplen --prettify-http --output-http http://192.168.3.110:80 --output-http-timeout 10s --output-http-workers 1000 --output-http-workers-min 100  --http-allow-header "Aww-Csid: xxxxx" --output-http-track-response --http-allow-method POST --middleware "/production/www/go_replay/client/middleware/sync --project {project_name}" --output-http-compatibility-mode --http-allow-url /article/detail

goreplay 支持 Java 程序配合工作的。支持开启插件模式:

gor --input-raw :80 --middleware "java -jar xxx.jar" --output-file request.gor

通过 middleware 参数可以传递一条命令给 gor ,gor 会拉起一个进程执行这个命令。在录制过程中,gory 通过获取进程的标准输入和输出与插件进程进行通信。

数据流向大致如下:

+-------------+     Original request     +--------------+     Modified request      +-------------+
|  Gor input  |----------STDIN---------->|  Middleware  |----------STDOUT---------->| Gor output  |
+-------------+                          +--------------+                           +-------------+
  input-raw                              java -jar xxx.jar                            output-file

拦截器的设置

参考地址:https://github.com/buger/goreplay/wiki/Dealing-with-missing-requests-and-responses

实际使用过程中,发现录制流量并发达到一定量级会丢失很多请求,经过阅读官方文档和测试,发现最相关的一个关键参数是 –input-raw-buffer-size
其主要原因四由于 gor 本身需要对数据包进行读取,协议解析等,借助于 pcap 及 os 缓冲区,当缓冲区不足,到达的数据包不足以组装 Http 请求则出现丢失或失效请求,无法正确处理。

listener.go 该参数是作用在底层录制上:

  inactive.SetTimeout(t.messageExpire)
      inactive.SetPromisc(true)
      inactive.SetImmediateMode(t.immediateMode)
      if t.immediateMode {
   
        log.Println("Setting immediate mode")
      }
      if t.bufferSize > 0 {
   
        inactive.SetBufferSize(int(t.bufferSize))
      }

      handle, herr := inactive.Activate()
      if herr != nil {
   
        log.Println("PCAP Activate error:", herr)
        wg.Done()
        return
      }

在具体复制动作定义bufferSize:

// CopyMulty copies from 1 reader to multiple writers
func CopyMulty(src io.Reader, writers ...io.Writer) (err error) {
   
  buf := make([]byte, Settings.copyBufferSize)
  wIndex := 0
  modifier := NewHTTPModifier(&Settings.modifierConfig)
  filteredRequests := make(map[string]time.Time)
  filteredRequestsLastCleanTime := time.Now()

  ......
}

五、代码调用链路图

最后附送一张 gor 代码调用链路图。

原图地址:

目录
相关文章
|
人工智能 自然语言处理 算法
阿里云智能客服知识运营白皮书
        阿里云智能客服知识运营白皮书的撰写,是协调包括算法工程师、开发工程师、产品设计师、AIT 人工智能训练师人员等多角色,将技术理论基础和实际实践经验进行结合,形成业内首部智能客服知识运营白皮书。白皮书以阿里云智能客服系统为应用标的,面向智能客服中的知识定义、知识应用、知识梳理方法三大环节进行描述和说明,希望为智能客服领域的知识应用提供具备指导性意义的方法论。一直以来,智能客服领域的知
858 1
阿里云智能客服知识运营白皮书
|
网络协议 Java 测试技术
性能工具之常见流量复制工具
我们把用户访问系统造成的数据传输定义为流量,那么在用户访问系统的过程中,我们可以把进入和流出的数据复制下来,进行保存,待后续使用,即离线模式,或者转发到一个新的服务器,立即使用,即在线模式。
943 2
性能工具之常见流量复制工具
|
Unix Linux Go
流量回放工具之 Goreplay 安装及初级使用
【6月更文挑战第2天】流量回放工具之 Goreplay 安装及初级使用
1469 3
|
中间件
流量回放工具之GoReplay output-http 源码分析
【6月更文挑战5天】流量回放工具之GoReplay output-http 源码分析
285 2
|
存储 消息中间件 运维
架构升级的救星!流量回放自动化测试的必备指南
大家好,我是小米,一名29岁的技术宅。今天分享一个物联网领域的实用技能——流量回放自动化测试。系统重构后,测试工作量巨大,本文介绍如何通过日志收集和数据回放进行自动化测试,包括离线、实时和并行回放模式,帮助快速定位Bug,提升测试效率和系统稳定性。欢迎关注我的微信公众号“软件求生”,获取更多技术干货!
582 3
|
缓存 网络协议 测试技术
推荐一款简单易用线上引流测试工具:GoReplay
推荐一款简单易用线上引流测试工具:GoReplay
697 0
推荐一款简单易用线上引流测试工具:GoReplay
|
缓存 Linux 网络安全
解决 CentOS 7 官方 yum 仓库无法使用的最佳实践
【8月更文挑战第18天】若 CentOS 7 的官方 YUM 仓库无法使用,可按以下步骤解决: 1. **检查网络连接**: - 确认服务器能正常上网,可通过访问外部网站或网络诊断测试。 - 检查防火墙设置,避免其阻挡 YUM 的网络访问。 2. **检查 YUM 配置**: - 核实 `/etc/yum.repos.d/` 下的 `CentOS-Base.repo` 文件中仓库地址正确无误。 - 确认配置文件内的 `enabled` 选项设为 `1` 以启用仓库。
5728 0
|
算法 Oracle Java
一文详解|从JDK8飞升到JDK17,再到未来的JDK21
本文深入浅出地解析了从JDK8到JDK17版本升级的新特性,并展望后续将会更新的JDK21.
12072 8
|
测试技术 Linux Go
Grafana 系列 - 统一展示 -9-Jaeger 数据源
Grafana 系列 - 统一展示 -9-Jaeger 数据源