云原生系列Go语言篇-并发 Part 2

本文涉及的产品
容器镜像服务 ACR,镜像仓库100个 不限时长
性能测试 PTS,5000VUM额度
应用实时监控服务-用户体验监控,每月100OCU免费额度
简介: 既然已经讲解了Go为并发所提供的基础工具,我们就来学习一些并发的最佳实践和模式吧。

并发实践和模式

既然已经讲解了Go为并发所提供的基础工具,我们就来学习一些并发的最佳实践和模式吧。

保持API无并发

并发是一种实现细节,好的API设计应当尽可能隐藏实现细节。这样在修改代码时无需修改其调用方式。

在实践中,这意味着永远不要在API的类型、函数及方法中暴露通道或互斥锁(我们会在何时用互斥锁替换通道中讨论互斥锁)。如果暴露了通道,就将通道管理的职责交给API的使用者了。这表示使用者要关心通道是否有缓冲、是否关闭或是nil。还有可能因访问通道或互斥锁的顺序出问题而导致死锁。

注:这并不是说不能将通道作为函数参数或结构体参数。只是说不应导出。

这一规则也有一些例外。如果API是一个带有并发帮助函数的库(比如time.After,我们会在如何让代码超时一节中使用),通道就会是API的一部分。

协程、for循环及各种变量

大部分时候,用于启动协程的闭包没有任何参数。它是通过声明它的环境中捕获变量。有一个通用场景这种方法不适用,也就是尝试从获取for循环的索引或值时。以下代码包含一个隐藏的bug:

func main() {
    a := []int{2, 4, 6, 8, 10}
    ch := make(chan int, len(a))
    for _, v := range a {
        go func() {
            ch <- v * 2
        }()
    }
    for i := 0; i < len(a); i++ {
        fmt.Println(<-ch)
    }
}

我们为a中的每个开启一个协程。看起来我们为每个协程传递了不同的值,但运行代码得到的结果却是:

20
20
20
20
20

每个协程对ch所写入的都是20的原因是,每个协程的闭包获取的是同一个变量。for循环中的索引和值变量在每次迭代中是复用的。最后一次对v所赋的值是10。运行协程时,这就是对协程可见的值。这一问题不只是对for循环,只要协程依赖的变量的值有可能发生变化,就必须将值传递给协程。有两种实现方式。第一种是在循环内遮蔽该值:

for _, v := range a {
    v := v
    go func() {
        ch <- v * 2
    }()
}

如果希望避免遮蔽,让代码流更为清晰,也可以把值作为参数传递给协程:

for _, v := range a {
    go func(val int) {
        ch <- val * 2
    }(v)
}

小贴士:在协程使用的变量值会发生变化时,可以把值作为参数传递给协程:

一定要清理好协程

在启动协程函数时,必须要保证它最终会退出。与变量不同,Go运行时无法监测到协程是否不再使用。如果协程不退出,调度器仍然会定期给它时间,什么工作也不做,这会拖慢程序。这称为协程泄漏(goroutine leak)。

协程是否会退出可能并不那么明显。比如,使用协程作为生成器:

func countTo(max int) <-chan int {
    ch := make(chan int)
    go func() {
        for i := 0; i < max; i++ {
            ch <- i
        }
        close(ch)
    }()
    return ch
}
func main() {
    for i := range countTo(10) {
        fmt.Println(i)
    }
}

注:这只是一个简短示例,不要使用协程生成数字列表。操作太过简单,违反了我们“何时使用并发”的指导方针。

在这个常见用例中,我们使用所有值的地方协程退出了。但如果循环过早退出,协程就会一直阻塞,等待从通道中读取值:

func main() {
    for i := range countTo(10) {
        if i > 5 {
            break
        }
        fmt.Println(i)
    }
}

done通道模式

done通道模式提供了一种发送信号通知协程停止进程的方式。它使用一个通道来发送退出信号。我们来看向多个函数发送相同数据、但只需要最快函数的结果的示例:

func searchData(s string, searchers []func(string) []string) []string {
    done := make(chan struct{})
    result := make(chan []string)
    for _, searcher := range searchers {
        go func(searcher func(string) []string) {
            select {
            case result <- searcher(s):
            case <-done:
            }
        }(searcher)
    }
    r := <-result
    close(done)
    return r
}

在我的函数中,声明了一个名称为done的通道,包含struct{}类型的数据。我们使用了空结构体类型,因为其值并不重要,我们不会向该通道写入,只是会关闭它。我们为每个传入的搜索函数开启一个协程。worker协程中的select语句会等待对result通道的写入(在searcher函数返回之时)或是对done通道的读取。回顾下读取开启的通道会等待有数据可读并且读取已关闭通知总是会返回通道的零值。这意味着从done读取的分支会在关闭done前保持等待状态。在searchData中,我们读取第一个写入result的值,然后关闭done。这会向协程发送信息让其退出,防止协程泄漏。

有时希望根据调用栈中前面函数中的内容来停止协程。在上下文一章中,我们会学习如何使用上下文来告知一个或多个协程该关闭了。

使用cancel函数来终止协程

我们也可以使用done通道来实现函数一章中所看到的一种模式:与通道一起返回撤销函数。我们回到前面的countTo示例来了解是如何使用的。撤销函数必须在for循环之后调用:

func countTo(max int) (<-chan int, func()) {
    ch := make(chan int)
    done := make(chan struct{})
    cancel := func() {
        close(done)
    }
    go func() {
        for i := 0; i < max; i++ {
            select {
            case <-done:
                return
            case ch<-i:
            }
        }
        close(ch)
    }()
    return ch, cancel
}
func main() {
    ch, cancel := countTo(10)
    for i := range ch {
        if i > 5 {
            break
        }
        fmt.Println(i)
    }
    cancel()
}

countTo函数创建了两个通道,一个返回数据,另一个发出完成的信息。这里没有直接返回完成通道,而是创建一个关闭完成通道的闭包并返回该闭包。通过闭包来撤销让我们可以在需要时执行一些额外的清理工作。

何时使用缓冲和无缓冲通道

掌握Go并发最复杂的一项技术是决定何时使用缓冲通道。默认,通道是无缓冲的,这很容易理解:一个协程写入并等待另一个协程接收,就像是接力赛中的接力棒一样。缓冲通道就更复杂了。需要选择大小,因为缓冲通道中的缓冲是有限度的。恰当的使用缓冲通道意味着我们必须处理缓冲满了写入协程等待读取的阻塞情况。那怎样算是恰当地使用缓冲通道呢?

缓冲通道的场景很微妙。可以一句话总结如下:

缓冲通道用于的场景是知道要启动多少个协程、希望限定启动的协程的数量或是限定排队处理任务的数量。

缓冲通道可很好处理的任务有从一组所启动的协程中收集数据或是希望限制并发的使用。它们有助于管理系统中排队的任务数量、防止服务来不及处理而崩溃。下面有一些示例可展示其使用场景。

第一个例子中,我们处理通道上的前10条结果。这时我们启动10个协程,每个协程将结果写入到缓冲通道上:

func processChannel(ch chan int) []int {
    const conc = 10
    results := make(chan int, conc)
    for i := 0; i < conc; i++ {
        go func() {
            v := <- ch
            results <- process(v)
        }()
    }
    var out []int
    for i := 0; i < conc; i++ {
        out = append(out, <-results)
    }
    return out
}

我们确切地知道所启动的协程数量,并且希望每个协程在完成任务后退出。这表示我们可以为每个启动协程创建一个带一个空间的缓冲通道,并让每个协程无阻塞地写入到这个协程。可以遍历这个缓冲通道,读取其中写入的值。读取完所有值后,返回结果,我们知道不会产生协程泄漏。

背压(backpressure)

另一项可通过缓冲通道实现的技术是背压机制。这有些反直觉,但在组件限定了希望执行的工作量后系统的性能会整体变好。我们可以使用缓冲通道和select语句来限定系统中同步请求的数量:

type PressureGauge struct {
    ch chan struct{}
}
func New(limit int) *PressureGauge {
    ch := make(chan struct{}, limit)
    for i := 0; i < limit; i++ {
        ch <- struct{}{}
    }
    return &PressureGauge{
        ch: ch,
    }
}
func (pg *PressureGauge) Process(f func()) error {
    select {
    case <-pg.ch:
        f()
        pg.ch <- struct{}{}
        return nil
    default:
        return errors.New("no more capacity")
    }
}

在这段代码中,我们创建了一个带缓冲通道结构体,具有一些“令牌”和一个函数。每次协程希望使用函数时,它会调用Processselect尝试从通道读取令牌。如果可以读取则运行函数,并将令牌返回给缓冲通道。如果无法读取到令牌,则运行default分支,就会返回错误。下面有一个快速示例对内置的HTTP服务器使用这段代码(我们会在标准库一章学习到如何使用HTTP服务器):

func doThingThatShouldBeLimited() string {
    time.Sleep(2 * time.Second)
    return "done"
}
func main() {
    pg := New(10)
    http.HandleFunc("/request", func(w http.ResponseWriter, r *http.Request) {
        err := pg.Process(func() {
            w.Write([]byte(doThingThatShouldBeLimited()))
        })
        if err != nil {
            w.WriteHeader(http.StatusTooManyRequests)
            w.Write([]byte("Too many requests"))
        }
    })
    http.ListenAndServe(":8080", nil)
}

关闭select中的分支

在需要从多个并发源中合并数据时,select关键字可完美胜任。但需要适当地处理关闭的通道。如果select中的一个分支在读取关闭的通道,总是会成功,返回的是零值。每次选取一个分支时,需要检测值是有效的并跳过分支。如果读取出现问题,程序会浪费大量时间读取垃圾值。

这时,我们依赖这样的错误:读取一个nil通道。前面学到过,读取或写入nil通道会导致代码永远挂起。虽然在由bug引发时会很糟糕,但我们可以使用nil通道来让select中的case无效。在监测到通道关闭时,将通道变量设置为nil。关联的分支就无法运行,因为从nil通道读取不会返回任何值:

// in和in2都是通道, done是完结channel.
for {
    select {
    case v, ok := <-in:
        if !ok {
            in = nil // 这一分支永远不再会成功!
            continue
        }
        // 处理从in中读取的v
    case v, ok := <-in2:
        if !ok {
            in2 = nil // 这一分支永远不再会成功!
            continue
        }
        // 处理从in2中读取的v
    case <-done:
        return
    }
}

如何让代码超时

大部分交互程序需要在一定时间内返回响应。Go并发可以做的一个任务是管理请求(或请求的一部分)要运行多长时间。其它语言在promise和future之上引入了额外的特性来添加这一功能,但Go的超时语句展示了如何通过已有功能构建复杂的特性。我们来一窥究竟:

func timeLimit() (int, error) {
    var result int
    var err error
    done := make(chan struct{})
    go func() {
        result, err = doSomeWork()
        close(done)
    }()
    select {
    case <-done:
        return result, err
    case <-time.After(2 * time.Second):
        return 0, errors.New("work timed out")
    }
}

在需要对Go中的操作进行限时时,就会看到这一模式的变体。这里的select有两个分支。第一个分支使用了前面学过的完结通道模式。我们使用协程闭包来对resulterr赋值,并关闭done通道。如果done通道先关闭了,对done的读取成功并返回该值。

第二个通道由time包中的After函数返回。在传递完指定的time.Duration之后会写入一个值。(我们会在标准库一章中讲到time包)。在doSomeWork完成前读取到这个值时,timeLimit会返回超时错误。

注:如果在协程完成处理前退出timeLimit,协程会继续运行。我们只是不再对其(最终)返回的结果进行处理。如果希望停止不再等待的协程的任务,可使用上下文撤销。在上下文一章中会进行讨论。

使用WaitGroup

有时一个协程需要等待多个协程先完成任务。如果等待的是单个协程,可以使用之前学习的完结通道模式。但如果等待的是多个协程,就需要使用WaitGroup,它位于标准库的sync包中。下面是一个简单示例,可在The Go Playground中运行:

func main() {
    var wg sync.WaitGroup
    wg.Add(3)
    go func() {
        defer wg.Done()
        doThing1()
    }()
    go func() {
        defer wg.Done()
        doThing2()
    }()
    go func() {
        defer wg.Done()
        doThing3()
    }()
    wg.Wait()
}

sync.WaitGroup声明时无需进行初始化,因为其零值也是有用的。sync.WaitGroup有三个方法:Add用于增加所等待的协程数;Done用于减少其计数器,在协程完成时调用;Wait等待协程直到计数器变为0。Add通常只调用一次,传递的是要启动的协程数。Done在协程内调用。要保证即使协程崩溃也会被调用,我们使用了defer

读者会注意到我们没有显式传递sync.WaitGroup。有两个原因。其一是必须保证所有使用sync.WaitGroup的地方都使用的是同一个实例。如传将sync.WaitGroup传递给协程函数而又没使用指针,那么函数得到的就是一个拷贝,Done就不会减少原始sync.WaitGroup的计算器。通过使用闭包来获取sync.WaitGroup,就能保证所有的协程都指向同一个实例。

其二是出于设计原因。还记得我们应将并发保留在API之外吧。在前面的通道里我们看到,通常的模式是使用包含业务逻辑的闭包启动协程。闭包管理并发的问题而函数提供算法。

我们再来看一个更真实的示例。前面提到在多个协程写入同一个通道时,我们需要确保所写入的通道只会关闭一次。sync.WaitGroup就很能胜任这一要求。我们来看并发处理通道中值、将结果收集到切片再返回切片的函数是如何工作的:

func processAndGather(in <-chan int, processor func(int) int, num int) []int {
    out := make(chan int, num)
    var wg sync.WaitGroup
    wg.Add(num)
    for i := 0; i < num; i++ {
        go func() {
            defer wg.Done()
            for v := range in {
                out <- processor(v)
            }
        }()
    }
    go func() {
        wg.Wait()
        close(out)
    }()
    var result []int
    for v := range out {
        result = append(result, v)
    }
    return result
}

在这个例子中,我们启动了监控协程等待所有处理的协程退出。在都退出时,监控协程会对输出通道调用close。在out关闭及缓冲为空时for-range通道循环会退出。最后,函数返回处理所得到值。

虽然WaitGroup很方便,在调配协程时不应将其作为首选。仅在所有工作协程退出后需要进行清理时(比如关闭写入的通道)才使用它。

GOLANG.ORG/X和ERRGROUP

Go作者维护了一些补充标准库的工具。整体称为golang.org/x包,包含有一个ErrGroup类型,构建于WaitGroup之上用于创建一组在其中之一出现问题就停止处理的协程。阅读ErrGroup文档了解更多内容。

代码精确地只运行一次

init函数:能免则免中我们讲到,init应保留用于初始化有效的不可变包级状态。但有时我们希望进行懒加载,或是有些代码要求在程序运行后只初始化一次。这通常是因为初始化相对较慢,甚至是并不是每次运行时都需要。sync包有一个方便的类型Once,实现了这一功能。我们来快速看看如何使用:

type SlowComplicatedParser interface {
    Parse(string) string
}
var parser SlowComplicatedParser
var once sync.Once
func Parse(dataToParse string) string {
    once.Do(func() {
        parser = initParser()
    })
    return parser.Parse(dataToParse)
}
func initParser() SlowComplicatedParser {
    // 在这里做各种配置和加载
}

我们声明了两个包级变量,parser的类型为Compl⁠ica⁠tedParseronce的类型为sync.Once。类似sync.WaitGroup,我们不需要配置sync.Once的实例(这称为让零值有价值)。还是类似sync.WaitGroup,我们必须保证不生成sync.Once的拷贝,因为每个拷贝都使用其自身的状态来表明是否已使用。通常不应在函数内声明sync.Once实例,因为每次函数调用会创建新实例,并不会记录之前的调用。

在本例,我们希望确保parser只初始化了一次,因我们在传递给onceDo方法内设置了parser的值。如果Parse调用了多次,once.Do不会反复执行闭包。

组合并发工具

我们回到本章第一节中的示例。有一个函数调用三个web服务。我们向其中两个服务发送数据,然后接收这两个调用的结果发送给第三个服务,返回结果 。整个过程要小于50毫秒,否则返回错误。

先从调用的函数开始:

func GatherAndProcess(ctx context.Context, data Input) (COut, error) {
    ctx, cancel := context.WithTimeout(ctx, 50*time.Millisecond)
    defer cancel()
    p := processor{
        outA: make(chan AOut, 1),
        outB: make(chan BOut, 1),
        inC:  make(chan CIn, 1),
        outC: make(chan COut, 1),
        errs: make(chan error, 2),
    }
    p.launch(ctx, data)
    inputC, err := p.waitForAB(ctx)
    if err != nil {
        return COut{}, err
    }
    p.inC <- inputC
    out, err := p.waitForC(ctx)
    return out, err
}

首先我们设置了50毫秒超时的上下文。在没上下文时,使用其计时器而不是调用time.After。使用上下文计时器的一个好处是它让我们可以考虑调用该函数的函数所设定的超时。我们会在上下文一章讨论上下文,并在其中的计时器一节详细讲解超时的使用。现在读者只需要知道超时后会取消上下文。上下文的Done会返回上下文撤销时返回值的通道,取消可以是超时或显式调用上下文的取消方法。

在创建上下文之后,我们使用defer来确保会调用上下文的cancel函数。在上下文一章中撤销一节中会讲到,必须调用这一函数,否则会出现资源泄漏。

然后会通过一系列用于与协程通讯的通道来填充processor实例。每个通道都有缓冲,因此执行写入的协程可以完成写入不等待读取就退出。(errs通道缓冲大小为2,因为写入时可能会产生两个错误。)

processor结构如下:

type processor struct {
    outA chan AOut
    outB chan BOut
    outC chan COut
    inC  chan CIn
    errs chan error
}

接着,我们对processor调用launch方法来开启三个协程:一个用于调用getResultA,一个调用getResultB,还有一个调用getResultC

func (p *processor) launch(ctx context.Context, data Input) {
    go func() {
        aOut, err := getResultA(ctx, data.A)
        if err != nil {
            p.errs <- err
            return
        }
        p.outA <- aOut
    }()
    go func() {
        bOut, err := getResultB(ctx, data.B)
        if err != nil {
            p.errs <- err
            return
        }
        p.outB <- bOut
    }()
    go func() {
        select {
        case <-ctx.Done():
            return
        case inputC := <-p.inC:
            cOut, err := getResultC(ctx, inputC)
            if err != nil {
                p.errs <- err
                return
            }
            p.outC <- cOut
        }
    }()
}

getResultAgetResultB的协程差不多。它们分别调用各自的方法。如果返回了错误,将错误写入p.errs通道。如果返回了有效值,将值写入通道中(getResultA的结果写入p.outAgetResultB的结果写入p.outB)。

因为只有在getResultAgetResultB成功并且在50毫秒内完成才调用getResultC,第三个协程稍显复杂。它包含带两个分支的select。第一个在上下文撤销时触发。第二个在调用getResultC的数据存在时触发。如果数据存在,函数进行了调用,这个逻辑与前两个协程的逻辑类似。

在协程启动后,我们调用processorwaitForAB方法:

func (p *processor) waitForAB(ctx context.Context) (CIn, error) {
    var inputC CIn
    count := 0
    for count < 2 {
        select {
        case a := <-p.outA:
            inputC.A = a
            count++
        case b := <-p.outB:
            inputC.B = b
            count++
        case err := <-p.errs:
            return CIn{}, err
        case <-ctx.Done():
            return CIn{}, ctx.Err()
        }
    }
    return inputC, nil
}

这使用for-select循环来对CIn实例同时也是的getResultC参数inputC赋值。共4个分支。前两个读取前两个协程所写入的通道并对inputC的字段赋值。如果这两个分支都执行了,我们会退出for-select循环并返回inputC的值,和nil错误。

后两个分支处理错误条件。如果p.errs通道中写入了错误,就返回该错误。如果上下文被撤销了,我们返回表示请求被撤销的错误。

回到GatherAndProcess,我们执行了一个标准的nil错误检测。如果正常,将inputC的值写入p.inC通道,然后调用processorwaitForC方法:

func (p *processor) waitForC(ctx context.Context) (COut, error) {
    select {
    case out := <-p.outC:
        return out, nil
    case err := <-p.errs:
        return COut{}, err
    case <-ctx.Done():
        return COut{}, ctx.Err()
    }
}

这个方法包含一个select。如果getResultC成功完成,我们从p.outC通道读取输出并返回。如果getResultC返回错误,我们从p.errs读取错误并返回。最后,如果上下文被撤销了,我们返回一个相应的错误。在waitForC完成后,GatherAndProcess将结果返回给其调用者。

如果确定getResultC的作者会做正确的事,代码可进行简化。因为上下文传递给了getResultC,该函数可以考虑超时进行写入,在超时后返回错误。这样,我们可以在GatherAndProcess中直接调用getResultC。这就可以去掉processor中的inCoutClaunch中的一个协程以及整个waitForC方法。总的原则是在程序正确的情况下使用尽量少的并发。

通过使用协程、通道和select语句架构代码,我们分成了不同的步骤,允许各部分以任意顺序运行和完成,并且在各部分间清晰地交的数据。此外我们还保障了程序的任意部分不会挂起,并且恰当地处理了函数本身及调用历史中其它函数的超时。如果不相信这是实现并发更好的方法,请尝试使用其它语言进行实现。可能会惊讶于其实现难度。

何时用互斥锁替换通道

如在其它编程语言中调配跨线程数据访问,可能会使用互斥锁(mutex-mutual exclusion的缩写)。互斥锁的任务是限制一些代码的并发执行或是访问同一块数据。所保护的部分称为关键段(critical section)。

Go作者们设计通道和select来管理并发有很多很好的原因。互斥锁的主要问题是它模糊了程序内的数据流。数据通过一系列通道从一个协程传入另一个协程时,数据流是清晰的。对值的访问在一段时间内会本地化某个协程中。在使用互斥锁保护一个值时,无法表明哪个协程当前拥有值的所有权,因为对值的访问由所有并发进程共享。这就很难理解处理顺序。Go社区中有一个描述这一哲学的名言:“通过通信共享内存,而不是通过共享内存来通信”。

话虽如此,有时使用互斥锁会更为清晰,所以Go标准库包含了适用这些场景的互斥锁实现。最常见的情况是协程读取或写入一个共享值,但不对值进行处理。我们以多玩家游戏的内存计分板为例。首先看如何使用通道实现。下面是一个可使用协程启动管理计分板的函数:

func scoreboardManager(in <-chan func(map[string]int), done <-chan struct{}) {
    scoreboard := map[string]int{}
    for {
        select {
        case <-done:
            return
        case f := <-in:
            f(scoreboard)
        }
    }
}

该函数声明了一个字典,然后监听通道中读取或修改字典的函数,以及一个确定何时关闭的通道。我们创建类型和将值写入字典的方法:

type ChannelScoreboardManager chan func(map[string]int)
func NewChannelScoreboardManager() (ChannelScoreboardManager, func()) {
    ch := make(ChannelScoreboardManager)
    done := make(chan struct{})
    go scoreboardManager(ch, done)
    return ch, func() {
        close(done)
    }
}
func (csm ChannelScoreboardManager) Update(name string, val int) {
    csm <- func(m map[string]int) {
        m[name] = val
    }
}

更新方法非常简洁,只是传递一个将值放入字典的函数。但怎么读取计分板呢?我们需要返回一个值。这意味着使用完结模式等待传入ScoreboardManager的函数完成运行:

func (csm ChannelScoreboardManager) Read(name string) (int, bool) {
    var out int
    var ok bool
    done := make(chan struct{})
    csm <- func(m map[string]int) {
        out, ok = m[name]
        close(done)
    }
    <-done
    return out, ok
}

虽然代码运行正常,但这很笨重并且一次只能有一个读取器。更好的方法是使用互斥锁。标准库中有两个互斥锁实现,都位于sync包中。第一个名为Mutex,它有两个方法LockUnlock。只要另一个协程处于关键段调用Lock会导致当前协程暂停。在清楚了关键段后,当前协程会获取到锁,关键段中的代码会执行。调用Mutex中的Unlock方法标志着关键段的终结。

第二种互斥锁的实现名为RWMutex,它让我们获取读锁和写锁。关键段中一次只能获取一个writer,但读锁是共享的,关键段中一次可获取多个reader。写锁通过LockUnlock方法来管理,而读锁由RLockRUnlock方法管理。

在获取互斥锁时,必须要确保你会释放锁。在调用LockRLock后使用defer语句来调用Unlock

type MutexScoreboardManager struct {
    l          sync.RWMutex
    scoreboard map[string]int
}
func NewMutexScoreboardManager() *MutexScoreboardManager {
    return &MutexScoreboardManager{
        scoreboard: map[string]int{},
    }
}
func (msm *MutexScoreboardManager) Update(name string, val int) {
    msm.l.Lock()
    defer msm.l.Unlock()
    msm.scoreboard[name] = val
}
func (msm *MutexScoreboardManager) Read(name string) (int, bool) {
    msm.l.RLock()
    defer msm.l.RUnlock()
    val, ok := msm.scoreboard[name]
    return val, ok
}

我们已经看到互斥锁的实现了,请在使用时仔细考虑你的选择。Katherine Cox-Buday杰出的《Go语言并发之道》中有一个决策树,可帮助我们决定该使用通道还是互斥锁:

  • 如果在调配协程或追踪由一系列协程所转化的值,使用通道。
  • 如果共享对结构体中字段的访问,使用互斥体。
  • 如果在使用通道时发现严重性能问题(参见编写测试一章的基准测试),并且无法找到其它方法修复这一问题,将代码修改为使用互斥锁。

因为计分板是结构体中的一个字段,没有对计分板的传输,使用互斥锁在情理之中。这里使用互斥锁很好,因为数据在内存中存储。如果数据存储在外部服务中,比如在HTTP服务器或数据库中,不要使用互斥锁来守卫对系统的访问。

互斥锁要求我们做更多的管理。比如,必须正确地配对加锁和解锁,否则程序可能会死锁。我们示例在同一个方法中获取并释放了锁。另一个问题是Go中互斥锁并不是可重入的(reentrant)。如果一个协程尝试重复获取同一个锁,会出现死锁,等待它自己释放锁。这与Java这类语言不同,它们的锁是可重入的。

不可重入锁让递归调用自己的函数获取锁变得麻烦。必须在递归函数调用前释放锁。总之,在持有锁时注意函数的调用,因为不知道在这些调用中会获取哪些锁。如果函数调用了另一个尝试获取同一把锁的函数,协程就会死锁。

sync.WaitGroupsync.Once一样,不要拷贝互斥锁。如果将它们传入函数或以结构体中的一个字段进行访问,必须通过指针。如果拷贝了互斥锁,其锁无法共享。

警告:不要尝试用多个协程访问同一个变量,除非先获得到了该变量的互斥锁。它可能会导致难以追踪的奇怪错误。参见编写测试一章中的通过竞争检测查找并发问题来学习如何监测这些问题。

SYNC.MAP-这是不你以为的字典

在查看sync包时,会发现一个名为的Map的类型。它提供了Go内置的map的并发安全版本。因其实现中所做的权衡,sync.Map仅适用于特定场景:

  • 在共享字典中键值对只插入一次但读取多次时
  • 在协程共享字典,但不访问彼此的键和值时

此外,因为Go早期没有泛型,sync.Map使用interface{}作为其键和值的类型,编译器无法帮助我们确定所使用的正确的数据类型。

因为有这些限制,在极少数场景中我们需要在多个协程间共享字典,使用由sync.RWMutex保护的内置map

Atomic-你可能用不上

除了互斥锁,Go提供了其它方式可保持跨线程的数据一致性。sync/atomic包提供了对内置到现代CPU中原子变量运算的访问,用于增加、交换、加载、存储或比较交换(CAS)一个能装到单个寄存器中的值。

如果需要压榨出最后一点性能,并且是编写并发代码的专家,你会乐于见到Go包含对原子运算的支持。对于剩下的人,请使用协程和互斥锁管理并发需求。

在哪里深入学习并发

这里我们讲解了一些简单并发模式,但还有很多其它知识。事实上,可以写一整本书来讲解正确实现Go中各种并发模式,所幸Katherine Cox-Buday就写了这样一本书。前面在讨论该决定使用通道还是互斥锁时已经提到了这本书,《Go语言并发之道》,它对于与Go和并发相关的知识都是很好的读物。可以阅读这本书学习更多知识。

小结

本章中,我们讲解了并发并学习了为什么Go的方式比其它的传统并发机制更简单。在讲解过程中,我们还说明了什么时候该使用并发以及一些并发规则和模式。下一章中,我们会快速学习Go的标准库,它全面拥抱现代计算机的“内置电池”价值观。

本文来自正在规划的Go语言&云原生自我提升系列,欢迎关注后续文章。

相关文章
|
15天前
|
存储 Go 索引
go语言中数组和切片
go语言中数组和切片
26 7
|
15天前
|
Go 开发工具
百炼-千问模型通过openai接口构建assistant 等 go语言
由于阿里百炼平台通义千问大模型没有完善的go语言兼容openapi示例,并且官方答复assistant是不兼容openapi sdk的。 实际使用中发现是能够支持的,所以自己写了一个demo test示例,给大家做一个参考。
|
15天前
|
程序员 Go
go语言中结构体(Struct)
go语言中结构体(Struct)
92 71
|
14天前
|
存储 Go 索引
go语言中的数组(Array)
go语言中的数组(Array)
100 67
|
17天前
|
Go 索引
go语言for遍历数组或切片
go语言for遍历数组或切片
88 62
|
19天前
|
并行计算 安全 Go
Go语言中的并发编程:掌握goroutines和channels####
本文深入探讨了Go语言中并发编程的核心概念——goroutine和channel。不同于传统的线程模型,Go通过轻量级的goroutine和通信机制channel,实现了高效的并发处理。我们将从基础概念开始,逐步深入到实际应用案例,揭示如何在Go语言中优雅地实现并发控制和数据同步。 ####
|
15天前
|
存储 Go
go语言中映射
go语言中映射
32 11
|
17天前
|
Go
go语言for遍历映射(map)
go语言for遍历映射(map)
29 12
|
16天前
|
Go 索引
go语言使用索引遍历
go语言使用索引遍历
26 9
|
20天前
|
安全 Serverless Go
Go语言中的并发编程:深入理解与实践####
本文旨在为读者提供一个关于Go语言并发编程的全面指南。我们将从并发的基本概念讲起,逐步深入到Go语言特有的goroutine和channel机制,探讨它们如何简化多线程编程的复杂性。通过实例演示和代码分析,本文将揭示Go语言在处理并发任务时的优势,以及如何在实际项目中高效利用这些特性来提升性能和响应速度。无论你是Go语言的初学者还是有一定经验的开发者,本文都将为你提供有价值的见解和实用的技巧。 ####