塞完数据之后close(chan)会怎样
一、close(chan)之后程序继续读取会怎样
看代码
type taskFunc func(ctx context.Context) error func Run(ctx context.Context, tasks ...taskFunc) (e error) { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() // 开辟tasks长度的buf chan queue := make(chan taskFunc, len(tasks)) for _, task := range tasks { queue <- task } // 关闭chan ???这里关闭 对后面读取有影响吗 close(queue) numCPU := runtime.NumCPU() wg := sync.WaitGroup{} wg.Add(numCPU) for i:=0; i<numCPU; i++ { go func() { defer wg.Done() for { select { case task, ok := <- queue: // 读取 if !ok { fmt.Println("chan closed!") return } if ctx.Err() != nil { fmt.Println("context error") return } if err := task(ctx); err != nil { e = err fmt.Println("task run error: ", err) return } case <-ctx.Done(): e = ctx.Err() fmt.Println("Done !") return } } }() } wg.Wait() fmt.Println("end") return nil } func T1(ctx context.Context) error { return nil } func T2(ctx context.Context) error { return nil } func T3(ctx context.Context) error { time.Sleep(10*time.Second) return errors.New("T3 超时了") } func T4(ctx context.Context) error { return errors.New("T4 error") } func main() { Run(context.Background(), T1, T2,T3, T4) }
测试:
import ( "context" "fmt" "go.uber.org/goleak" "testing" ) func TestMain(m *testing.M) { fmt.Println("start") goleak.VerifyTestMain(m) } func TestRun(t *testing.T) { Run(context.Background(), T1, T2,T3, T4) }
输出:
start === RUN TestRun chan closed! chan closed! chan closed! chan closed! task run error: T4 error chan closed! chan closed! task run error: T3 超时了 done --- PASS: TestRun (10.00s)
发现程序正常运行输出结果,这是为什么呢?
二、原因在select的接受处理上
看select源码发现,selectgo在循环处理case上有独特的顺序,且看:
loop: for i := 0; i < ncases; i++ { // 根据 `pollorder` 记录的随机 scases 索引来获取 cas casi = int(pollorder[i]) cas = &scases[casi] c = cas.c switch case.kind { case caseNil: continue case caseRecv: // ... case caseSend: // ... case caseDefault: // ... } } // ... }
根据pollorder
记录的随机scases
索引来遍历处理case
,然后根据case.kind
来查看channel
是否准备好,然后goto
跳转到相应逻辑。
case.kind
为caseNil
,说明channel
为nil
,那么continue
,不进行任何处理。
caseRecv
: channel
接收操作:
switch case.kind { case caseRecv: sg = c.sendq.dequeue() if sg != nil { goto recv } if c.qcount > 0 { goto bufrecv } if c.closed != 0 { goto rclose } ... }
- 看第一个if:如果
channel
中有待发送的goroutine
, 跳转到recv
,调用recv
完成接收操作。 - 看第二个if:如果
channel
中有缓冲数据,那么跳转到bufrecv
,从缓冲区中获取数据。 - 看第三个if:如果
channel
已关闭,跳转到rclose
, 将接收值置为空值,recvOK
置为false
。
看到了吧,在recv的时候,判断接受操作优先于判断关闭操作,所以我们上述程序在塞完task之后就直接关闭chan
三、小结
掌握channel
的for select
用法至关重要,稍不留意就会造成程序bug,所以平时多看看源码,想一想为什么。