简介
在golang中,channel用于并发控制、广播、数据交换等。
每个channel内部有3个队列:接收、发送协程队列和环形缓冲队列。
Channel操作包括读、写和关闭,与3种状态组合有9种场景。
例如,关闭的channel读取会得到0值,写入会panic。
可以使用-, ok
模式判断channel是否关闭,select
用于处理多个channel。
缓冲channel能增强并发,超时控制结合select
避免阻塞。
1 channel 3个状态
未初始化状态,只进行声明,或者手动赋值 nil
nil
正常channel 可读或可写
active
已关闭,千万不要误认为 channel关闭后,值为nil
closed
使用场景
广播,如消费者/生产者模型
交换数据
并发控制
显示通知等
每个channel内部实现都有三个队列
接收消息的协程队列。
这个队列的结构是一个限定最大长度的链表,所有阻塞在channel的接收操作的协程都会被放在这个队列里。
发送消息的协程队列。
这个队列的结构也是一个限定最大长度的链表。所有阻塞在channel的发送操作的协程也都会被放在这个队列里。
环形数据缓冲队列。
这个环形数组的大小就是channel的容量。如果数组装满了,就表示channel满了,如果数组里一个值也没有,就表示channel是空的。对于一个阻塞型channel来说,它总是同时处于即满又空的状态。
2 channel 3个操作
读,写,关闭,3个操作和3个channel状态 有9种常见 场景
操作 channel处于nil channel处于 active channel处于closed
<-ch 阻塞 成功或阻塞 读到 0 值
ch <- 阻塞 成功或阻塞 panic
close(ch) panic 成功 panic
注:当nil通道在select的某个case时,这个case将阻塞,但不会造成死锁
- 支持使用for range 读channel
3 使用 _,ok 判断channel是否关闭
if v, ok := <- ch; ok {
fmt.Println(v)
}
4 使用select处理多个channel
select同时监控多个 通道,只处理未阻塞的case,通道为nil,对应case永远阻塞,无论读写。
func (h *Handler) handler(job *Job) {
select {
case h.jobCh <- job:
return
case <-h.stopCh:
return
}
}
5 使用channel的声明控制读写权限
协程对某个通道只读或只写时
如果协程对某个channel只有写操作,则这个channel声明为只写。
如果协程对某个channel只有读操作,则这个channel声明为只读。
只有 generator进行对outCh进行写操作,返回声明。
<- chan int 可以防止其他协程乱用此通道,造成隐藏bug
func generator(int n) <- chan int {
outCh := make(chan int)
go func(){ # 匿名函数
for i:=0; i<n;i++{
outCh<-i
}
}()
return outCh
}
consumber 只读inCh数据,声明为 <- chan int,防止它向 inCh写数据
func consumer(inCh <- chan int){
for x := range inCh{
fmt.Println(x)
}
}
6 使用缓冲channel增强并发
无缓冲
ch1 := make(chan int)
ch2 := make(chan int, 0)
有缓冲
ch3 := make(chan int, 1)
func test() {
inCh := generator(100)
outCh := make(chan int, 10)
使用5个do
协程同时处理输入数据
var wg sync.WaitGroup
wg.Add(5)
for i := 0; i < 5; i++ {
go do(inCh, outCh, &wg)
}
go func() {
wg.Wait()
close(outCh)
}()
for r := range outCh {
fmt.Println(r)
}
}
func generator(n int) <-chan int {
outCh := make(chan int)
go func() {
for i := 0; i < n; i++ {
outCh <- i
}
close(outCh)
}()
return outCh
}
func do(inCh <-chan int, outCh chan<- int, wg *sync.WaitGroup) {
for v := range inCh {
outCh <- v * v
}
wg.Done()
}
7 超时控制
看操作 和定时器 哪个先返回 就处理哪个
func doWithTimeOut(timeout time.Duration) (int, error) {
select {
case ret := <-do():
return ret, nil
case <-time.After(timeout):
return 0, errors.New("timeout")
}
}
func do() <-chan int {
outCh := make(chan int)
go func() {
// do work
}()
return outCh
}
8 channel 无阻塞读写
func unBlockRead(ch chan int) (x int, err error) {
select {
case x = <-ch:
return x, nil
case <-time.After(time.Microsecond):
return 0, errors.New("read time out")
}
}
func unBlockWrite(ch chan int, x int) (err error) {
select {
case ch <- x:
return nil
case <-time.After(time.Microsecond):
return errors.New("read time out")
}
}
9 关闭下游协程
可以使用WaitGroup等待所有协程退出
func (h *Handler) Stop() {
close(h.stopCh)
}
收到停止后,不再处理请求
func (h *Handler) loop() error {
for {
select {
case req := <-h.reqCh:
go handle(req)
case <-h.stopCh:
return
}
}
}
10 使用 chan struct{} 作为信号 channel
上例中的Handler.stopCh就是一个例子,stopCh并不需要传递任何数据.
只是要给所有协程发送退出的信号,没有数据传递,则传递空 struct。
type Handler struct {
stopCh chan struct{}
reqCh chan *Request
}
11 使用 channel 传递结构体的指针,而非结构体本身
推荐:
reqCh chan *Request
不推荐:
reqCh chan Request
12 使用channel传递channel
场景,使用场景多:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
存放结果的channel的channel
func main() {
reqs := []int{1, 2, 3, 4, 5, 6, 7, 8, 9}
outs := make(chan chan int, len(reqs))
var wg sync.WaitGroup
wg.Add(len(reqs))
for _, x := range reqs {
o := handle(&wg, x)
outs <- o
}
go func() {
wg.Wait()
close(outs)
}()
读取结果,结果有序
for o := range outs {
fmt.Println(<-o)
}
}
handle 处理请求,耗时随机模拟
func handle(wg *sync.WaitGroup, a int) chan int {
out := make(chan int)
go func() {
time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
out <- a
wg.Done()
}()
return out
}
13 小结
本文介绍Go语言的channel有3种状态:未初始化(nil)、活动(可读写)和关闭。使用chan struct{}
传递停止信号,推荐传递结构体指针而非结构体本身,甚至可以传递channel处理异步结果。
这里和接下来几个章节 记录一些go常见的特征 和 使用场景。