讲透Go中的并发接收控制结构select

简介: 讲透Go中的并发接收控制结构select

本节源码位置 https://github.com/golang-minibear2333/golang/blob/master/4.concurrent/4.5-select


4.5.1 select与switch


让我们来复习一下switch语句,在switch语句中,会逐个匹配case语句(可以是值也可以是表达式),一个一个的判断过去,直到有符合的语句存在,执行匹配的语句内容后跳出switch。


func demo(number int){
    switch{
        case number >= 90:
        fmt.Println("优秀")
        default:
        fmt.Println("太搓了")
    }
}

而 select 用于处理通道,它的语法与 switch 非常类似。每个 case 语句里必须是一个 channel 操作。它既可以用于 channel 的数据接收,也可以用于 channel 的数据发送。

func foo() {
  chanInt := make(chan int)
  defer close(chanInt)
  go func() {
    select {
    case data, ok := <-chanInt:
      if ok {
        fmt.Println(data)
      }
    default:
      fmt.Println("全部阻塞")
    }
  }()
    time.Sleep(time.Second)
    chanInt <- 1
}


输出1


这是一个简单的接收发送模型。

如果 select 的多个分支都满足条件,则会随机的选取其中一个满足条件的分支。

第6行加上ok是因为上一节讲过,如果不加会导致通道关闭时收到零值。

回忆之前的知识,接收和发送应该在不同的goroutine里。

其次select default子协程,在case都处于阻塞状态时,会直接执行default的内容。导致子协程提前退出,主协程中的写入操作会一直阻塞(等待接收者,接收者已经退出了) 触发死锁

倒数第二行加了sleep 1秒,是因为让select语句提前结束的问题暴露出来。

全部阻塞
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.bar()

select 执行完了,退出了goroutine,而发送才刚刚执行到,没有与其匹配的接收,故死锁。

正确的做法是把接收套在循环里面。

func bar() {
  chanInt := make(chan int)
  defer close(chanInt)
  go func() {
    for {
      select {
          ...
      }
    }
  }()
  chanInt <- 1
}
  • 不再死锁了
  • 假如程序不停止,会出现一个泄露的goroutine,永远的在for循环中无法跳出,此时引入下一节的内容


4.5.2 通知机制


Go 语言总是简单和灵活的,虽然没有针对提供专门的机制来处理退出,但我们可以自己组合

func main() {
  chanInt, done := make(chan int), make(chan struct{})
  defer close(chanInt)
  defer close(done)
  go func() {
    for {
      select {
      case <-chanInt:
      case <-done:
        return
      }
    }
  }()
  done <- struct{}{}
}
  • 没有给chanInt发送任何东西,按理说会阻塞,导致goroutine泄露
  • 但可以使用额外的通道完成协程的退出控制
  • 这种方式还可以做到周期性处理任务,下一节我们再详细讲解


4.5.3 case执行原理


假如case后左边和右边跟了函数,会执行函数,我们来探索一下。


定义A、B函数,作用相同

func A() int {
  fmt.Println("start A")
  time.Sleep(1 * time.Second)
  fmt.Println("end A")
  return 1
}

定义函数lee,请问该函数执行完成耗时多少呢?

func lee() {
  ch, done := make(chan int), make(chan struct{})
  defer close(ch)
  go func() {
    select {
    case ch <- A():
    case ch <- B():
    case <-done:
    }
  }()
  done <- struct{}{}
}

答案是2秒


start A
end A
start B
end B
main.leespend time: 2.003504395s


select扫描是从左到右从上到下的,按这个顺序先求值,如果是函数会先执行函数。

然后立马判断是否可以立即执行(这里是指case是否会因为执行而阻塞)。

所以两个函数都会进入,而且是先进入A再进入B,两个函数都会执行完,所以等待时间会累计。

所以不应该在case判断中放函数。


如果都不会阻塞,此时就会使用一个伪随机的算法,去选中一个case,只要选中了其他就被放弃了。


4.5.4 超时控制


我们来模拟一个更真实点的例子,让程序一段时间超时退出。

定义一个结构体

type Worker struct {
  stream  <-chan int //处理
  timeout time.Duration //超时
  done    chan struct{} //结束信号
}

定义初始化函数

func NewWorker(stream <-chan int, timeout int) *Worker {
  return &Worker{
    stream:  stream,
    timeout: time.Duration(timeout) * time.Second,
    done:    make(chan struct{}),
  }
}

定义超时处理函

func (w *Worker) afterTimeStop() {
  go func() {
    time.Sleep(w.timeout)
    w.done <- struct{}{}
  }()
}


  • 超过时间发送结束信号

接收数据并处理函数

func (w *Worker) Start() {
  w.afterTimeStop()
  for {
    select {
    case data, ok := <-w.stream:
      if !ok {
        return
      }
      fmt.Println(data)
    case <-w.done:
      close(w.done)
      return
    }
  }
}
  • 收到结束信号关闭函数
  • 这样的方法就可以让程序在等待 1 秒后继续执行,而不会因为 ch 读取等待而导致程序停滞。
func main() {
  stream := make(chan int)
  defer close(stream)
  w := NewWorker(stream, 3)
  w.Start()
}

实际3秒到程序运行结束。好在官方已经考虑到这一点,为我们提供了现成的方案。


4.5.5 官方超时方案

go func() {
    t := time.NewTicker(timeout)
    defer t.Stop()
    for {
        select {
        case data := <-chanInt:
            t.Reset(timeout)
        case <-t.C:
        case <-done:
            return
        }
    }
}()


time.NewTicker创建了一个定时器,参数为时间间隔,并返回一个结构体t。

t.C 是一个仅可接收的channel,会根据时间间隔定时执行任务,也可以作为超时任务使用。

t.Reset(timeout) 重置时间,因为select进入一个case,后续的执行会有耗时,所以要重置时间保证时间的精准。

这种方式巧妙地实现了超时处理机制,这种方法不仅简单,在实际项目开发中也是非常实用的。


在生产中,常常把buf积累到一定数量然后flush出去,假如数据产生速度太慢,就要靠定时器定时消费,看下面完整的例子。

func main() {
  chanInt, done := make(chan int), make(chan struct{})
  defer close(chanInt)
  defer close(done)
  go func() {
    ...
  }()
  for i := 0; i < 100; i++ {
    if i%10 == 0 {
      time.Sleep(time.Second)
    }
    chanInt <- 1
  }
  done <- struct{}{}
}

产生100个数,每10个数暂停1秒,用来模拟数据产生速度慢,go func() 内容如下:

go func() {
    timeout := time.Second
    t := time.NewTicker(timeout)
    defer t.Stop()
    buf := make([]int, 0, 5)
    for {
        select {
        case data := <-chanInt:
            t.Reset(timeout)
            if len(buf) < cap(buf) {
                buf = append(buf, data)
            } else {
                go send(buf)
                buf = make([]int, 0, cap(buf))
            }
        case <-t.C:
            if len(buf) > 0 {
                go send(buf)
                buf = make([]int, 0, cap(buf))
            }
        case <-done:
            return
        }
    }
}()


  • 接收到数据时,如果buf满了就进行上报,如果buf没满就追加数据。
  • 假如超时,就直接发送buf防止数据太少一直不发送的情况。
  • 需要在其他case里,Reset超时时间,以校准定时器。


4.5.6 小结


本节介绍了select的用法以及包含的陷阱,我们学会了:


case只针对通道传输阻塞做特殊处理,如果有计算将会先进行计算,所以不应该在case判断中放函数。

扫描是从左到右从上到下的,按这个顺序先求值,如果是函数会先执行函数。如果函数运行时间长,时间会累计。

在case全部阻塞时,会执行default中的内容。

可使用结束信号,让select退出。

延时发送结束信号可以实现超时自动退出的功能。

官方的time包,提供了定时器,可作定时任务,也可作超时控制。


我还写了可热更新的定时器,有兴趣了解的可以看看本节的源码哦。

相关文章
|
5月前
|
算法 Java Go
【GoGin】(1)上手Go Gin 基于Go语言开发的Web框架,本文介绍了各种路由的配置信息;包含各场景下请求参数的基本传入接收
gin 框架中采用的路优酷是基于httprouter做的是一个高性能的 HTTP 请求路由器,适用于 Go 语言。它的设计目标是提供高效的路由匹配和低内存占用,特别适合需要高性能和简单路由的应用场景。
492 4
|
10月前
|
人工智能 安全 算法
Go入门实战:并发模式的使用
本文详细探讨了Go语言的并发模式,包括Goroutine、Channel、Mutex和WaitGroup等核心概念。通过具体代码实例与详细解释,介绍了这些模式的原理及应用。同时分析了未来发展趋势与挑战,如更高效的并发控制、更好的并发安全及性能优化。Go语言凭借其优秀的并发性能,在现代编程中备受青睐。
322 33
|
5月前
|
存储 监控 算法
基于 Go 语言跳表结构的局域网控制桌面软件进程管理算法研究
针对企业局域网控制桌面软件对海量进程实时监控的需求,本文提出基于跳表的高效管理方案。通过多级索引实现O(log n)的查询、插入与删除性能,结合Go语言实现并发安全的跳表结构,显著提升进程状态处理效率,适用于千级进程的毫秒级响应场景。
236 15
|
9月前
|
人工智能 监控 Kubernetes
Go语言在select语句中实现优先级
Go语言中的`select`语句用于监控多个Channel的发送或接收操作,选择就绪的分支执行。它支持多种使用场景,如空`select`永久阻塞、单`case`阻塞读写、多`case`随机选择、配合`default`实现非阻塞操作等。通过嵌套`select`还可实现执行优先级,适用于如Kubernetes中任务调度等实际场景。
111 0
|
6月前
|
存储 监控 算法
企业电脑监控系统中基于 Go 语言的跳表结构设备数据索引算法研究
本文介绍基于Go语言的跳表算法在企业电脑监控系统中的应用,通过多层索引结构将数据查询、插入、删除操作优化至O(log n),显著提升海量设备数据管理效率,解决传统链表查询延迟问题,实现高效设备状态定位与异常筛选。
178 3
|
6月前
|
存储 Java 编译器
对比Java学习Go——程序结构与变量
本节对比了Java与Go语言的基础结构,包括“Hello, World!”程序、代码组织方式、入口函数定义、基本数据类型及变量声明方式。Java强调严格的面向对象结构,所有代码需置于类中,入口方法需严格符合`public static void main(String[] args)`格式;而Go语言结构更简洁,使用包和函数组织代码,入口函数为`func main()`。两种语言在变量声明、常量定义、类型系统等方面也存在显著差异,体现了各自的设计哲学。
253 0
|
9月前
|
存储 Go 开发者
Go 语言中如何处理并发错误
在 Go 语言中,并发编程中的错误处理尤为复杂。本文介绍了几种常见的并发错误处理方法,包括 panic 的作用范围、使用 channel 收集错误与结果,以及使用 errgroup 包统一管理错误和取消任务,帮助开发者编写更健壮的并发程序。
185 4
Go 语言中如何处理并发错误
|
9月前
|
Go
理解 Go 语言中的 select 用法
本文深入解析了Go语言中`select`的用法,它类似于`switch case`,但仅用于通道(channel)的操作。文章通过多个示例说明了`select`的基本用法、避免死锁的方法、随机性特点以及如何实现超时机制。同时总结了`select`与`switch`的区别:`select`专用于通道操作,case执行是随机的,需注意死锁问题,且不支持`fallthrough`和函数表达式。
339 1
理解 Go 语言中的 select 用法
|
8月前
|
存储 安全 算法
Go语言泛型-泛型对代码结构的优化
Go语言自1.18版本引入泛型,极大提升了代码的通用性与可维护性。通过泛型,开发者可以减少重复代码、提高类型安全性,并增强程序的复用性和可读性。本文详细介绍了泛型在数据结构、算法及映射功能中的应用,展示了其在优化代码结构方面的优势。同时,Go编译器对泛型代码进行类型推导,确保运行时性能不受影响。合理使用泛型,有助于构建更加灵活高效的程序。
|
10月前
|
Go C++
Go语言方法与接收者 -《Go语言实战指南》
本文介绍了 Go 语言中方法的相关概念和用法。方法是绑定到特定类型上的函数,包含值接收者和指针接收者两种形式。值接收者不会改变原始数据,而指针接收者可修改原始数据,且在处理大型结构体时性能更优。文章详细对比了方法与普通函数的区别,并说明了选择指针接收者的原因,如修改原始值、提升性能及保持一致性。此外,Go 支持为任意自定义类型定义方法,不仅限于结构体。最后通过表格总结了方法的核心概念和使用场景。
272 34

热门文章

最新文章