一个golang并行库源码解析

简介: ## 场景 有这样一种场景:四个任务A、B、C, D,其中任务B和C需要并发执行,得到结果1, 任务A执行得到结果2, 结果1和2作为任务D的参数传入,然后执行任务D得到最终结果。我们可以将任务执行顺序用如下图标识: ``` jobA jobB jobC \ \ / \ \ / \ middle \ /

场景

有这样一种场景:四个任务A、B、C, D,其中任务B和C需要并发执行,得到结果1, 任务A执行得到结果2, 结果1和2作为任务D的参数传入,然后执行任务D得到最终结果。我们可以将任务执行顺序用如下图标识:

jobA  jobB   jobC
 \      \     /
  \      \   /
   \      middle
    \      /
     \    /
     jobD

这是一个典型的多任务并发场景,实际上随着任务数量的增多,任务逻辑会更加复杂,如何编写可维护健壮的逻辑代码变得十分重要,虽然golang提供了同步机制,但是需要写很多重复无用的Add/Wait/Done代码,而且代码可读性也很差,这是不能容忍的。

本文介绍一个开源的golang并行库,源码地址https://github.com/buptmiao/parallel

数据结构

1. parallel结构体

type Parallel struct {
        wg        *sync.WaitGroup
        pipes     []*Pipeline
        wgChild   *sync.WaitGroup
        children  []*Parallel
        exception *Handler
}

parallel定义了一个多任务并发实例,主要包括:并发任务管道(pipes)、子任务并发实例(children)、子任务实例等待锁(wgChild)、当前并发任务实例等待锁(wg)

2. pipeline结构体

type Pipeline struct {
        handlers []*Handler
}  
type Handler struct {
        f    interface{}
        args []interface{}
        receivers []interface{}
}       

这里pipeline实际上是一系列并发任务实例handler,每一个handler包括任务函数f, 传入参数args以及返回结果receivers

parallel相关代码

新建parallel实例

func NewParallel() *Parallel {
        res := new(Parallel)
        res.wg = new(sync.WaitGroup)
        res.wgChild = new(sync.WaitGroup)
        res.pipes = make([]*Pipeline, 0, 10)
        return res
}       

注册handler

func (p *Parallel) Register(f interface{}, args ...interface{}) *Handler {
        return p.NewPipeline().Register(f, args...)
}
func (p *Parallel) NewPipeline() *Pipeline {
        pipe := NewPipeline()
        p.Add(pipe)
        return pipe
} 
func (p *Parallel) Add(pipes ...*Pipeline) *Parallel {
        p.wg.Add(len(pipes))
        p.pipes = append(p.pipes, pipes...)
        return p
}

新建子parallel实例

func (p *Parallel) NewChild() *Parallel {
        child := NewParallel()
        child.exception = p.exception
        p.AddChildren(child)
        return child
}
func (p *Parallel) AddChildren(children ...*Parallel) *Parallel {
        p.wgChild.Add(len(children))
        p.children = append(p.children, children...)
        return p
}

任务运行

func (p *Parallel) Run() {
        for _, child := range p.children {
                // this func will never panic
                go func(ch *Parallel) {
                        ch.Run()
                        p.wgChild.Done()
                }(child)
        }
        p.wgChild.Wait() //wait children instance done
        p.do() //run
        p.wg.Wait() //wait all job done
}
func (p *Parallel) do() {
        for _, pipe := range p.pipes {
                go p.Do()
        }
}

pipeline相关代码

新建pipeline实例

func NewPipeline() *Pipeline {
        res := new(Pipeline)
        return res
}       

注册handler

func (p *Pipeline) Register(f interface{}, args ...interface{}) *Handler {
        h := NewHandler(f, args...)
        p.Add(h)
        return h
}       

添加handler

func (p *Pipeline) Add(hs ...*Handler) *Pipeline {
        p.handlers = append(p.handlers, hs...)
        return p
}

任务运行

func (p *Pipeline) Do() {
        for _, h := range p.handlers {
                h.Do()
        }
}

handler相关代码

新建handler实例

func NewHandler(f interface{}, args ...interface{}) *Handler {
        res := new(Handler)
        res.f = f
        res.args = args
        return res
}

运行任务

func (h *Handler) Do() {
        f := reflect.ValueOf(h.f)
        typ := f.Type()
        //check if f is a function
        if typ.Kind() != reflect.Func {
                panic(ErrArgNotFunction)
        }
        //check input length, only check '>' is to allow varargs.
        if typ.NumIn() > len(h.args) {
                panic(ErrInArgLenNotMatch)
        }
        //check output length
        if typ.NumOut() != len(h.receivers) {
                panic(ErrOutArgLenNotMatch)
        }
        //check if output args is ptr
        for _, v := range h.receivers {
                t := reflect.ValueOf(v)
                if t.Type().Kind() != reflect.Ptr {
                        panic(ErrRecvArgTypeNotPtr)
                }
                if t.IsNil() {
                        panic(ErrRecvArgNil)
                }
        }

        inputs := make([]reflect.Value, len(h.args))
        for i := 0; i < len(h.args); i++ {
                if h.args[i] == nil {
                        inputs[i] = reflect.Zero(f.Type().In(i))
                } else {
                        inputs[i] = reflect.ValueOf(h.args[i])
                }
        }
        out := f.Call(inputs)

        for i := 0; i < len(h.receivers); i++ {
                v := reflect.ValueOf(h.receivers[i])
                v.Elem().Set(out[i])
        }
}

demo

package main

import "github.com/buptmiao/parallel"

func testJobA(x, y int) int {
        return x - y
}

func testJobB(x, y int) int {
        return x + y
}

func testJobC(x, y *int, z int) float64 {
        return float64((*x)*(*y)) / float64(z)
}

func main() {
        var x, y int
        var z float64

        p := parallel.NewParallel()

        ch1 := p.NewChild()
        ch1.Register(testJobA, 1, 2).SetReceivers(&x)

        ch2 := p.NewChild()
        ch2.Register(testJobB, 1, 2).SetReceivers(&y)

        p.Register(testJobC, &x, &y, 2).SetReceivers(&z)

        p.Run()

        if x != -1 || y != 3 || z != -1.5 {
                panic("unexpected result")
        }
}
目录
相关文章
|
2月前
|
数据采集 数据挖掘 测试技术
Go与Python爬虫实战对比:从开发效率到性能瓶颈的深度解析
本文对比了Python与Go在爬虫开发中的特点。Python凭借Scrapy等框架在开发效率和易用性上占优,适合快速开发与中小型项目;而Go凭借高并发和高性能优势,适用于大规模、长期运行的爬虫服务。文章通过代码示例和性能测试,分析了两者在并发能力、错误处理、部署维护等方面的差异,并探讨了未来融合发展的趋势。
159 0
|
15天前
|
Cloud Native 安全 Java
Go语言深度解析:从入门到精通的完整指南
🌟 蒋星熠Jaxonic,执着的星际旅人,用Go语言编写代码诗篇。🚀 Go语言以简洁、高效、并发为核心,助力云计算与微服务革新。📚 本文详解Go语法、并发模型、性能优化与实战案例,助你掌握现代编程精髓。🌌 从goroutine到channel,从内存优化到高并发架构,全面解析Go的强大力量。🔧 实战构建高性能Web服务,展现Go在云原生时代的无限可能。✨ 附技术对比、最佳实践与生态全景,带你踏上Go语言的星辰征途。#Go语言 #并发编程 #云原生 #性能优化
|
6月前
|
算法 Go 索引
【LeetCode 热题100】45:跳跃游戏 II(详细解析)(Go语言版)
本文详细解析了力扣第45题“跳跃游戏II”的三种解法:贪心算法、动态规划和反向贪心。贪心算法通过选择每一步能跳到的最远位置,实现O(n)时间复杂度与O(1)空间复杂度,是面试首选;动态规划以自底向上的方式构建状态转移方程,适合初学者理解但效率较低;反向贪心从终点逆向寻找最优跳点,逻辑清晰但性能欠佳。文章对比了各方法的优劣,并提供了Go语言代码实现,助你掌握最小跳跃次数问题的核心技巧。
218 15
|
2月前
|
缓存 监控 安全
告别缓存击穿!Go 语言中的防并发神器:singleflight 包深度解析
在高并发场景中,多个请求同时访问同一资源易导致缓存击穿、数据库压力过大。Go 语言提供的 `singleflight` 包可将相同 key 的请求合并,仅执行一次实际操作,其余请求共享结果,有效降低系统负载。本文详解其原理、实现及典型应用场景,并附示例代码,助你掌握高并发优化技巧。
201 0
|
2月前
|
数据采集 JSON Go
Go语言实战案例:实现HTTP客户端请求并解析响应
本文是 Go 网络与并发实战系列的第 2 篇,详细介绍如何使用 Go 构建 HTTP 客户端,涵盖请求发送、响应解析、错误处理、Header 与 Body 提取等流程,并通过实战代码演示如何并发请求多个 URL,适合希望掌握 Go 网络编程基础的开发者。
|
6月前
|
机器学习/深度学习 存储 算法
【LeetCode 热题100】347:前 K 个高频元素(详细解析)(Go语言版)
这篇文章详细解析了力扣热题 347——前 K 个高频元素的三种解法:哈希表+小顶堆、哈希表+快速排序和哈希表+桶排序。每种方法都附有清晰的思路讲解和 Go 语言代码实现。小顶堆方法时间复杂度为 O(n log k),适合处理大规模数据;快速排序方法时间复杂度为 O(n log n),适用于数据量较小的场景;桶排序方法在特定条件下能达到线性时间复杂度 O(n)。文章通过对比分析,帮助读者根据实际需求选择最优解法,并提供了完整的代码示例,是一篇非常实用的算法学习资料。
350 90
|
4月前
|
存储 设计模式 安全
Go 语言单例模式全解析:从青铜到王者段位的实现方案
单例模式确保一个类只有一个实例,并提供全局访问点,适用于日志、配置管理、数据库连接池等场景。在 Go 中,常用实现方式包括懒汉模式、饿汉模式、双重检查锁定,最佳实践是使用 `sync.Once`,它并发安全、简洁高效。本文详解各种实现方式的优缺点,并提供代码示例与最佳应用建议。
97 5
|
5月前
|
存储 算法 Go
【LeetCode 热题100】17:电话号码的字母组合(详细解析)(Go语言版)
LeetCode 17题解题思路采用回溯算法,通过递归构建所有可能的组合。关键点包括:每位数字对应多个字母,依次尝试;递归构建下一个字符;递归出口为组合长度等于输入数字长度。Go语言实现中,使用map存储数字到字母的映射,通过回溯函数递归生成组合。时间复杂度为O(3^n * 4^m),空间复杂度为O(n)。类似题目包括括号生成、组合、全排列等。掌握回溯法的核心思想,能够解决多种排列组合问题。
131 11
|
5月前
|
Go
【LeetCode 热题100】155:最小栈(详细解析)(Go语言版)
本文详细解析了力扣热题155:最小栈的解题思路与实现方法。题目要求设计一个支持 push、核心思路是使用辅助栈法,通过两个栈(主栈和辅助栈)来维护当前栈中的最小值。具体操作包括:push 时同步更新辅助栈,pop 时检查是否需要弹出辅助栈的栈顶,getMin 时直接返回辅助栈的栈顶。文章还提供了 Go 语言的实现代码,并对复杂度进行了分析。此外,还介绍了单栈 + 差值记录法的进阶思路,并总结了常见易错点,如 pop 操作时忘记同步弹出辅助栈等。
146 6
|
5月前
|
Go 索引
【LeetCode 热题100】739:每日温度(详细解析)(Go语言版)
这篇文章详细解析了 LeetCode 第 739 题“每日温度”,探讨了如何通过单调栈高效解决问题。题目要求根据每日温度数组,计算出等待更高温度的天数。文中推荐使用单调递减栈,时间复杂度为 O(n),优于暴力解法的 O(n²)。通过实例模拟和代码实现(如 Go 语言版本),清晰展示了栈的操作逻辑。此外,还提供了思维拓展及相关题目推荐,帮助深入理解单调栈的应用场景。
150 6

推荐镜像

更多
  • DNS