一个golang并行库源码解析

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: ## 场景 有这样一种场景:四个任务A、B、C, D,其中任务B和C需要并发执行,得到结果1, 任务A执行得到结果2, 结果1和2作为任务D的参数传入,然后执行任务D得到最终结果。我们可以将任务执行顺序用如下图标识: ``` jobA jobB jobC \ \ / \ \ / \ middle \ /

场景

有这样一种场景:四个任务A、B、C, D,其中任务B和C需要并发执行,得到结果1, 任务A执行得到结果2, 结果1和2作为任务D的参数传入,然后执行任务D得到最终结果。我们可以将任务执行顺序用如下图标识:

jobA  jobB   jobC
 \      \     /
  \      \   /
   \      middle
    \      /
     \    /
     jobD

这是一个典型的多任务并发场景,实际上随着任务数量的增多,任务逻辑会更加复杂,如何编写可维护健壮的逻辑代码变得十分重要,虽然golang提供了同步机制,但是需要写很多重复无用的Add/Wait/Done代码,而且代码可读性也很差,这是不能容忍的。

本文介绍一个开源的golang并行库,源码地址https://github.com/buptmiao/parallel

数据结构

1. parallel结构体

type Parallel struct {
        wg        *sync.WaitGroup
        pipes     []*Pipeline
        wgChild   *sync.WaitGroup
        children  []*Parallel
        exception *Handler
}

parallel定义了一个多任务并发实例,主要包括:并发任务管道(pipes)、子任务并发实例(children)、子任务实例等待锁(wgChild)、当前并发任务实例等待锁(wg)

2. pipeline结构体

type Pipeline struct {
        handlers []*Handler
}  
type Handler struct {
        f    interface{}
        args []interface{}
        receivers []interface{}
}       

这里pipeline实际上是一系列并发任务实例handler,每一个handler包括任务函数f, 传入参数args以及返回结果receivers

parallel相关代码

新建parallel实例

func NewParallel() *Parallel {
        res := new(Parallel)
        res.wg = new(sync.WaitGroup)
        res.wgChild = new(sync.WaitGroup)
        res.pipes = make([]*Pipeline, 0, 10)
        return res
}       

注册handler

func (p *Parallel) Register(f interface{}, args ...interface{}) *Handler {
        return p.NewPipeline().Register(f, args...)
}
func (p *Parallel) NewPipeline() *Pipeline {
        pipe := NewPipeline()
        p.Add(pipe)
        return pipe
} 
func (p *Parallel) Add(pipes ...*Pipeline) *Parallel {
        p.wg.Add(len(pipes))
        p.pipes = append(p.pipes, pipes...)
        return p
}

新建子parallel实例

func (p *Parallel) NewChild() *Parallel {
        child := NewParallel()
        child.exception = p.exception
        p.AddChildren(child)
        return child
}
func (p *Parallel) AddChildren(children ...*Parallel) *Parallel {
        p.wgChild.Add(len(children))
        p.children = append(p.children, children...)
        return p
}

任务运行

func (p *Parallel) Run() {
        for _, child := range p.children {
                // this func will never panic
                go func(ch *Parallel) {
                        ch.Run()
                        p.wgChild.Done()
                }(child)
        }
        p.wgChild.Wait() //wait children instance done
        p.do() //run
        p.wg.Wait() //wait all job done
}
func (p *Parallel) do() {
        for _, pipe := range p.pipes {
                go p.Do()
        }
}

pipeline相关代码

新建pipeline实例

func NewPipeline() *Pipeline {
        res := new(Pipeline)
        return res
}       

注册handler

func (p *Pipeline) Register(f interface{}, args ...interface{}) *Handler {
        h := NewHandler(f, args...)
        p.Add(h)
        return h
}       

添加handler

func (p *Pipeline) Add(hs ...*Handler) *Pipeline {
        p.handlers = append(p.handlers, hs...)
        return p
}

任务运行

func (p *Pipeline) Do() {
        for _, h := range p.handlers {
                h.Do()
        }
}

handler相关代码

新建handler实例

func NewHandler(f interface{}, args ...interface{}) *Handler {
        res := new(Handler)
        res.f = f
        res.args = args
        return res
}

运行任务

func (h *Handler) Do() {
        f := reflect.ValueOf(h.f)
        typ := f.Type()
        //check if f is a function
        if typ.Kind() != reflect.Func {
                panic(ErrArgNotFunction)
        }
        //check input length, only check '>' is to allow varargs.
        if typ.NumIn() > len(h.args) {
                panic(ErrInArgLenNotMatch)
        }
        //check output length
        if typ.NumOut() != len(h.receivers) {
                panic(ErrOutArgLenNotMatch)
        }
        //check if output args is ptr
        for _, v := range h.receivers {
                t := reflect.ValueOf(v)
                if t.Type().Kind() != reflect.Ptr {
                        panic(ErrRecvArgTypeNotPtr)
                }
                if t.IsNil() {
                        panic(ErrRecvArgNil)
                }
        }

        inputs := make([]reflect.Value, len(h.args))
        for i := 0; i < len(h.args); i++ {
                if h.args[i] == nil {
                        inputs[i] = reflect.Zero(f.Type().In(i))
                } else {
                        inputs[i] = reflect.ValueOf(h.args[i])
                }
        }
        out := f.Call(inputs)

        for i := 0; i < len(h.receivers); i++ {
                v := reflect.ValueOf(h.receivers[i])
                v.Elem().Set(out[i])
        }
}

demo

package main

import "github.com/buptmiao/parallel"

func testJobA(x, y int) int {
        return x - y
}

func testJobB(x, y int) int {
        return x + y
}

func testJobC(x, y *int, z int) float64 {
        return float64((*x)*(*y)) / float64(z)
}

func main() {
        var x, y int
        var z float64

        p := parallel.NewParallel()

        ch1 := p.NewChild()
        ch1.Register(testJobA, 1, 2).SetReceivers(&x)

        ch2 := p.NewChild()
        ch2.Register(testJobB, 1, 2).SetReceivers(&y)

        p.Register(testJobC, &x, &y, 2).SetReceivers(&z)

        p.Run()

        if x != -1 || y != 3 || z != -1.5 {
                panic("unexpected result")
        }
}
目录
相关文章
|
11天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
39 2
|
3天前
|
JSON Go 开发者
go-carbon v2.5.0 发布,轻量级、语义化、对开发者友好的 golang 时间处理库
carbon 是一个轻量级、语义化、对开发者友好的 Golang 时间处理库,提供了对时间穿越、时间差值、时间极值、时间判断、星座、星座、农历、儒略日 / 简化儒略日、波斯历 / 伊朗历的支持。
21 4
|
11天前
|
存储 安全 Linux
Golang的GMP调度模型与源码解析
【11月更文挑战第11天】GMP 调度模型是 Go 语言运行时系统的核心部分,用于高效管理和调度大量协程(goroutine)。它通过少量的操作系统线程(M)和逻辑处理器(P)来调度大量的轻量级协程(G),从而实现高性能的并发处理。GMP 模型通过本地队列和全局队列来减少锁竞争,提高调度效率。在 Go 源码中,`runtime.h` 文件定义了关键数据结构,`schedule()` 和 `findrunnable()` 函数实现了核心调度逻辑。通过深入研究 GMP 模型,可以更好地理解 Go 语言的并发机制。
|
21天前
|
存储 Cloud Native Shell
go库介绍:Golang中的Viper库
Viper 是 Golang 中的一个强大配置管理库,支持环境变量、命令行参数、远程配置等多种配置来源。本文详细介绍了 Viper 的核心特点、应用场景及使用方法,并通过示例展示了其强大功能。无论是简单的 CLI 工具还是复杂的分布式系统,Viper 都能提供优雅的配置管理方案。
|
24天前
|
消息中间件 缓存 安全
Future与FutureTask源码解析,接口阻塞问题及解决方案
【11月更文挑战第5天】在Java开发中,多线程编程是提高系统并发性能和资源利用率的重要手段。然而,多线程编程也带来了诸如线程安全、死锁、接口阻塞等一系列复杂问题。本文将深度剖析多线程优化技巧、Future与FutureTask的源码、接口阻塞问题及解决方案,并通过具体业务场景和Java代码示例进行实战演示。
40 3
|
1月前
|
存储
让星星⭐月亮告诉你,HashMap的put方法源码解析及其中两种会触发扩容的场景(足够详尽,有问题欢迎指正~)
`HashMap`的`put`方法通过调用`putVal`实现,主要涉及两个场景下的扩容操作:1. 初始化时,链表数组的初始容量设为16,阈值设为12;2. 当存储的元素个数超过阈值时,链表数组的容量和阈值均翻倍。`putVal`方法处理键值对的插入,包括链表和红黑树的转换,确保高效的数据存取。
56 5
|
18天前
|
安全 测试技术 Go
Go语言中的并发编程模型解析####
在当今的软件开发领域,高效的并发处理能力是提升系统性能的关键。本文深入探讨了Go语言独特的并发编程模型——goroutines和channels,通过实例解析其工作原理、优势及最佳实践,旨在为开发者提供实用的Go语言并发编程指南。 ####
|
23天前
|
Go
|
1月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
70 0
|
1月前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
57 0
下一篇
无影云桌面