流量回放工具之GoReplay input_file 源码分析

简介: 【6月更文挑战6天】流量回放工具之GoReplay input_file 源码分析

@[toc]

前言

GoReplay 对数据流的抽象出了两个概念,即用 输入(input ) 和 输出(output ) 来表示数据来源与去向,统称为 plugin,用介于输入和输出模块之间的中间件实现拓展机制。

input_file.go:实现文件读取的输入插件, 实现 io.Reader 接口,最后根据配置注册到 Plugin.inputs 队列里。

主要参数

  -input-file value   //从一个文件中读取请求
        Read requests from file: 
                gor --input-file ./requests.gor --output-http staging.com
  -input-file-dry-run  //模拟从数据源读取数据而不重新回放它
        Simulate reading from the data source without replaying it. You will get information about expected replay time, number of found records etc.
  -input-file-loop  //循环读取文件
        Loop input files, useful for performance testing.
  -input-file-max-wait duration  //设置请求之间的最大时间间隔
        Set the maximum time between requests. Can help in situations when you have too long periods between request, and you want to skip them. Example: --input-raw-max-wait 1s
  -input-file-read-depth int  //尝试提前读取和缓存多个记录。与此同时,如果请求没有按顺序出现,它还可以对请求进行排序。因为它需要在内存中保存这个缓冲区,较大的值会导致更差的性能(默认为100)
        GoReplay tries to read and cache multiple records, in advance. In parallel it also perform sorting of requests, if they came out of order. Since it needs hold this buffer in memory, bigger values can cause worse performance (default 100)

变速回放

使用说明

GoReplay 实现压力测试的核心特性就是满足流量变速回放功能。支持将录制的生产实际请求流量减少或者放大回放以用于压力测试

比如运行以下命令,将流量从文件回放到 237 服务器,并放大两倍:

[root@vm-1 ~]./gor --input-file "requests.gor|200%" --output-http="http://172.16.106.237:8082"
2021/08/17 15:03:58 [PPID 12356 and PID 18187] Version:1.3.0
[DEBUG][elapsed 1.361742ms]: [INPUT-FILE] FileInput: end of file 'requests.gor'
  • requests.gor|1:最大不超过1QPS;
  • requests.gor|100%:超过原流量的100%。

源码解析

gor.go 为 main 方法,初始化所有插件,启动 emiter(核心处理器) 监听

初始化插件:
image.png

启动 emitter:
image.png

原始代码如下:

func main() {
   
   
    //获取环境变量
    if os.Getenv("GOMAXPROCS") == "" {
   
   
        //设置最大逻辑核
        runtime.GOMAXPROCS(runtime.NumCPU() * 2)
    }

    args := os.Args[1:]
    var plugins *InOutPlugins
    //文件服务
    if len(args) > 0 && args[0] == "file-server" {
   
   
        if len(args) != 2 {
   
   
            log.Fatal("You should specify port and IP (optional) for the file server. Example: `gor file-server :80`")
        }
        dir, _ := os.Getwd()

        Debug(0, "Started example file server for current directory on address ", args[1])

        log.Fatal(http.ListenAndServe(args[1], loggingMiddleware(args[1], http.FileServer(http.Dir(dir)))))
    } else {
   
   
        //解析命令行参数
        flag.Parse()
        //初始化全局的 Settings 变量
        checkSettings()
        //初始化可用插件
        plugins = NewPlugins()
    }

    log.Printf("[PPID %d and PID %d] Version:%s\n", os.Getppid(), os.Getpid(), VERSION)

    if len(plugins.Inputs) == 0 || len(plugins.Outputs) == 0 {
   
   
        log.Fatal("Required at least 1 input and 1 output")
    }

    if *memprofile != "" {
   
   
        profileMEM(*memprofile)
    }

    if *cpuprofile != "" {
   
   
        profileCPU(*cpuprofile)
    }

    if Settings.Pprof != "" {
   
   
        go func() {
   
   
            log.Println(http.ListenAndServe(Settings.Pprof, nil))
        }()
    }

    closeCh := make(chan int)
    //程序核心事件处理
    emitter := NewEmitter()
    //调用 Start 函数,启动 emitter
    go emitter.Start(plugins, Settings.Middleware)
    if Settings.ExitAfter > 0 {
   
   
        log.Printf("Running gor for a duration of %s\n", Settings.ExitAfter)

        time.AfterFunc(Settings.ExitAfter, func() {
   
   
            log.Printf("gor run timeout %s\n", Settings.ExitAfter)
            close(closeCh)
        })
    }
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt, syscall.SIGTERM)
    exit := 0
    select {
   
   
    case <-c:
        exit = 1
    case <-closeCh:
        exit = 0
    }
    //关闭所有协程
    emitter.Close()
    os.Exit(exit)
}

plugins.go 类中,通过 limiter 类执行各种变速操作:

image.png

源代码如下:

// Automatically detects type of plugin and initialize it
//
// See this article if curious about reflect stuff below: http://blog.burntsushi.net/type-parametric-functions-golang
func (plugins *InOutPlugins) registerPlugin(constructor interface{
   
   }, options ...interface{
   
   }) {
   
   
    var path, limit string
    vc := reflect.ValueOf(constructor)

    // Pre-processing options to make it work with reflect
    vo := []reflect.Value{
   
   }
    for _, oi := range options {
   
   
        vo = append(vo, reflect.ValueOf(oi))
    }

    if len(vo) > 0 {
   
   
        // Removing limit options from path
        path, limit = extractLimitOptions(vo[0].String())

        // Writing value back without limiter "|" options
        vo[0] = reflect.ValueOf(path)
    }

    // Calling our constructor with list of given options
    plugin := vc.Call(vo)[0].Interface()

    if limit != "" {
   
   
        plugin = NewLimiter(plugin, limit)
    }

    // Some of the output can be Readers as well because return responses
    if r, ok := plugin.(PluginReader); ok {
   
   
        plugins.Inputs = append(plugins.Inputs, r)
    }

    if w, ok := plugin.(PluginWriter); ok {
   
   
        plugins.Outputs = append(plugins.Outputs, w)
    }
    plugins.All = append(plugins.All, plugin)
}

plugins.go 类中,对类似 'request.gor|100%' 的参数进行解析:

image.png

源代码如下:

// extractLimitOptions detects if plugin get called with limiter support
// Returns address and limit
func extractLimitOptions(options string) (string, string) {
   
   
    split := strings.Split(options, "|")

    if len(split) > 1 {
   
   
        return split[0], split[1]
    }

    return split[0], ""
}

主要执行方法在 limiter.go 类,源代码如下:

package main

import (
    "fmt"
    "io"
    "math/rand"
    "strconv"
    "strings"
    "time"
)

// Limiter is a wrapper for input or output plugin which adds rate limiting
type Limiter struct {
   
   
    plugin    interface{
   
   }
    limit     int
    isPercent bool

    currentRPS  int
    currentTime int64
}

func parseLimitOptions(options string) (limit int, isPercent bool) {
   
   
    if n := strings.Index(options, "%"); n > 0 {
   
   
        limit, _ = strconv.Atoi(options[:n])
        isPercent = true
    } else {
   
   
        limit, _ = strconv.Atoi(options)
        isPercent = false
    }

    return
}

// NewLimiter constructor for Limiter, accepts plugin and options
// `options` allow to sprcify relatve or absolute limiting
func NewLimiter(plugin interface{
   
   }, options string) PluginReadWriter {
   
   
    l := new(Limiter)
    l.limit, l.isPercent = parseLimitOptions(options)
    l.plugin = plugin
    l.currentTime = time.Now().UnixNano()

    // FileInput have its own rate limiting. Unlike other inputs we not just dropping requests, we can slow down or speed up request emittion.
    if fi, ok := l.plugin.(*FileInput); ok && l.isPercent {
   
   
        fi.speedFactor = float64(l.limit) / float64(100)
    }

    return l
}

func (l *Limiter) isLimited() bool {
   
   
    // File input have its own limiting algorithm
    if _, ok := l.plugin.(*FileInput); ok && l.isPercent {
   
   
        return false
    }

    if l.isPercent {
   
   
        return l.limit <= rand.Intn(100)
    }

    if (time.Now().UnixNano() - l.currentTime) > time.Second.Nanoseconds() {
   
   
        l.currentTime = time.Now().UnixNano()
        l.currentRPS = 0
    }

    if l.currentRPS >= l.limit {
   
   
        return true
    }

    l.currentRPS++

    return false
}

// PluginWrite writes message to this plugin
func (l *Limiter) PluginWrite(msg *Message) (n int, err error) {
   
   
    if l.isLimited() {
   
   
        return 0, nil
    }
    if w, ok := l.plugin.(PluginWriter); ok {
   
   
        return w.PluginWrite(msg)
    }
    // avoid further writing
    return 0, io.ErrClosedPipe
}

// PluginRead reads message from this plugin
func (l *Limiter) PluginRead() (msg *Message, err error) {
   
   
    if r, ok := l.plugin.(PluginReader); ok {
   
   
        msg, err = r.PluginRead()
    } else {
   
   
        // avoid further reading
        return nil, io.ErrClosedPipe
    }

    if l.isLimited() {
   
   
        return nil, nil
    }

    return
}

func (l *Limiter) String() string {
   
   
    return fmt.Sprintf("Limiting %s to: %d (isPercent: %v)", l.plugin, l.limit, l.isPercent)
}

// Close closes the resources.
func (l *Limiter) Close() error {
   
   
    if fi, ok := l.plugin.(io.Closer); ok {
   
   
        fi.Close()
    }
    return nil
}

limiter.go 类同样实现文件限速功能:

image.png

input_file.go 类调用了文件限速功能:

image.png

文件循环读取

主要参数为:--input-file-loop

比如:

$ sudo ./gor --input-file 'request.gor|10000%' --input-file-loop --output-http 'http://10.96.136.36:8201'

在 plufins.go 类中应用文件循环使用:

image.png

源代码:

    for _, options := range Settings.InputFile {
   
   
        plugins.registerPlugin(NewFileInput, options, Settings.InputFileLoop, Settings.InputFileReadDepth, Settings.InputFileMaxWait, Settings.InputFileDryRun)
    }

input_file.go 类中实现文件读取功能:

image.png

源代码如下:

// NewFileInput constructor for FileInput. Accepts file path as argument.
func NewFileInput(path string, loop bool, readDepth int, maxWait time.Duration, dryRun bool) (i *FileInput) {
   
   
    i = new(FileInput)
    i.data = make(chan []byte, 1000)
    i.exit = make(chan bool)
    i.path = path
    i.speedFactor = 1
    i.loop = loop
    i.readDepth = readDepth
    i.stats = expvar.NewMap("file-" + path)
    i.dryRun = dryRun
    i.maxWait = maxWait

    if err := i.init(); err != nil {
   
   
        return
    }

    go i.emit()

    return
}

FileInput的构造函数,传入文件路径等参数。

image.png

源代码如下:

// FileInput can read requests generated by FileOutput
type FileInput struct {
   
   
    mu          sync.Mutex
    data        chan []byte
    exit        chan bool
    path        string
    readers     []*fileInputReader
    speedFactor float64
    loop        bool
    readDepth   int
    dryRun      bool
    maxWait     time.Duration

    stats *expvar.Map
}

判断是否循环读取:

image.png

源代码如下:

        if reader == nil {
   
   
            if i.loop {
   
   
                i.init()
                lastTime = -1
                continue
            } else {
   
   
                break
            }
        }

核心代码逻辑调用

image.png

image.png

目录
相关文章
|
3月前
|
SQL Java Apache
实时计算 Flink版操作报错合集之本地启动时,如何处理报错:The file STDOUT does not exist on the TaskExecutor
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
3月前
|
Java Docker 容器
典型热点应用问题之修改dockerfile以使用SYNC语法的问题如何解决
典型热点应用问题之修改dockerfile以使用SYNC语法的问题如何解决
|
4月前
|
运维 关系型数据库 流计算
实时计算 Flink版产品使用问题之作业运维界面无状态启动并指定时间和代码里指'scan.startup.mode'='latest-offset',哪个优先级更高
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
中间件
流量回放工具之GoReplay output-http 源码分析
【6月更文挑战5天】流量回放工具之GoReplay output-http 源码分析
84 2
|
6月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错之报错File is not a valid field name 如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
6月前
|
消息中间件 关系型数据库 Kafka
实时计算 Flink版操作报错之在执行任务时遇到了一个IO错误,具体表现为无法从本地主机(localhost)下载文件,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
6月前
|
消息中间件 Java Kafka
实时计算 Flink版操作报错合集之在运行过程中遇到"Could not upload job files"的问题如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
153 0
|
存储 运维 JavaScript
【Node.js实战】一文带你开发博客项目之日志(文件读写、stream流、写日志)
【Node.js实战】一文带你开发博客项目之日志(文件读写、stream流、写日志)
256 3
|
数据采集 监控 Java
网站流量日志分析--数据预处理-- 点击流模型visit编程实现思路| 学习笔记
快速学习网站流量日志分析--数据预处理--点击流模型 visit 编程实现思路
网站流量日志分析--数据预处理-- 点击流模型visit编程实现思路| 学习笔记
|
6月前
|
存储 运维 监控
日志服务 Scan 功能工作机制与最佳实践
大数据快速增长的需要泛日志(Log/Trace/Metric)是大数据的重要组成,伴随着每一年业务峰值的新脉冲,日志数据量在快速增长。同时,业务数字化运营、软件可观测性等浪潮又在对日志的存储、计算提出更高的要求。从时效性角度看日志计算引擎:数仓覆盖 T + 1 日志处理,准实时系统(搜索引擎、OLA...
159 0
日志服务 Scan 功能工作机制与最佳实践