Go基础系列:channel入门

简介:

channel基础

channel用于goroutines之间的通信,让它们之间可以进行数据交换。像管道一样,一个goroutine_A向channel_A中放数据,另一个goroutine_B从channel_A取数据

channel是指针类型的数据类型,通过make来分配内存。例如:

1ch := make(chan int)

这表示创建一个channel,这个channel中只能保存int类型的数据。也就是说一端只能向此channel中放进int类型的值,另一端只能从此channel中读出int类型的值。

需要注意,chan TYPE才表示channel的类型。所以其作为参数或返回值时,需指定为xxx chan int类似的格式。

向ch这个channel放数据的操作形式为:

1ch <- VALUE

从ch这个channel读数据的操作形式为:

1<-ch // 从ch中读取一个值val = <-chval := <-ch // 从ch中读取一个值并保存到val变量中val,ok = <-ch // 从ch读取一个值,判断是否读取成功,如果成功则保存到val变量中

其实很简单,当ch出现在<-的左边表示send,当ch出现在<-的右边表示recv。

例如:

 1package mainimport ( "fmt"
2 "time")func main() {
3 ch := make(chan string) go sender(ch) // sender goroutine
4 go recver(ch) // recver goroutine
5 time.Sleep(1e9)
6}func sender(ch chan string) {
7 ch <- "malongshuai"
8 ch <- "gaoxiaofang"
9 ch <- "wugui"
10 ch <- "tuner"}func recver(ch chan string) { var recv string
11 for {
12 recv = <-ch
13 fmt.Println(recv)
14 }
15}

输出结果:

1malongshuaigaoxiaofang
2wugui
3tuner

上面激活了一个goroutine用于执行sender()函数,该函数每次向channel ch中发送一个字符串。同时还激活了另一个goroutine用于执行recver()函数,该函数每次从channel ch中读取一个字符串。

注意上面的recv = <-ch,当channel中没有数据可读时,recver goroutine将会阻塞在此行。由于recver中读取channel的操作放在了无限for循环中,表示recver goroutine将一直阻塞,直到从channel ch中读取到数据,读取到数据后进入下一轮循环由被阻塞在recv = <-ch上。直到main中的time.Sleep()指定的时间到了,main程序终止,所有的goroutine将全部被强制终止。

因为receiver要不断从channel中读取可能存在的数据,所以receiver一般都使用一个无限循环来读取channel,避免sender发送的数据被丢弃。

channel的属性和分类

每个channel都有3种操作:send、receive和close

 ●  send:表示sender端的goroutine向channel中投放数据
 ●  receive:表示receiver端的goroutine从channel中读取数据
 ●  close:表示关闭channel
     ●  关闭channel后,send操作将导致painc
     ●  关闭channel后,recv操作将返回对应类型的0值以及一个状态码false
      ●  close并非强制需要使用close(ch)来关闭channel,在某些时候可以自动被关闭
       ●  如果使用close(),建议条件允许的情况下加上defer
       ●  只在sender端上显式使用close()关闭channel。因为关闭通道意味着没有数据再需要发送

例如,判断channel是否被关闭:

1val, ok := <-counterif ok {
2 fmt.Println(val)
3}

channel分为两种:unbuffered channel和buffered channel

 ●  unbuffered channel:阻塞、同步模式
 ●  sender端向channel中send一个数据,然后阻塞,直到receiver端将此数据receive
 ●  receiver端一直阻塞,直到sender端向channel发送了一个数据
 ●  buffered channel:非阻塞、异步模式
 ●  sender端可以向channel中send多个数据(只要channel容量未满),容量满之前不会阻塞
 ●  receiver端按照队列的方式(FIFO,先进先出)从buffered channel中按序receive其中数据

buffered channel有两个属性:容量和长度:和slice的capacity和length的概念是一样的

 ●  capacity:表示bufffered channel最多可以缓冲多少个数据
 ●  length:表示buffered channel当前已缓冲多少个数据
 ●  创建buffered channel的方式为 make(chan TYPE,CAP)

unbuffered channel可以认为是容量为0的buffered channel,所以每发送一个数据就被阻塞。注意,不是容量为1的buffered channel,因为容量为1的channel,是在channel中已有一个数据,并发送第二个数据的时候才被阻塞。

换句话说,send被阻塞的时候,其实是没有发送成功的,只有被另一端读走一个数据之后才算是send成功。对于unbuffered channel来说,这是send/recv的同步模式。

实际上,当向一个channel进行send的时候,先关闭了channel,再读取channel时会发现错误在send,而不是recv。它会提示向已经关闭了的channel发送数据。

1func main() {
2 counter := make(chan int) go func() {
3 counter <- 32
4 }() close(counter)
5 fmt.Println(<-counter)
6}

输出报错:

1panic: send on closed channel

所以,在Go的内部行为中,send和recv是一个整体行为,数据未读就表示未send成功

死锁(deadlock)

当channel的某一端(sender/receiver)期待另一端的(receiver/sender)操作,另一端正好在期待本端的操作时,也就是说两端都因为对方而使得自己当前处于阻塞状态,这时将会出现死锁问题。

比如,在main函数中,它有一个默认的goroutine,如果在此goroutine中创建一个unbuffered channel,并在main goroutine中向此channel中发送数据并直接receive数据,将会出现死锁:

1package main 
2
3import ( "fmt")func main (){
4 goo(32)
5}func goo(s int) {
6 counter := make(chan int)
7 counter <- s
8 fmt.Println(<-counter)
9}

在上面的示例中,向unbuffered channel中send数据的操作counter <- s是在main goroutine中进行的,从此channel中recv的操作<-counter也是在main goroutine中进行的。send的时候会直接阻塞main goroutine,使得recv操作无法被执行,go将探测到此问题,并报错:

1fatal error: all goroutines are asleep - deadlock!goroutine 1 [chan send]:

要修复此问题,只需将send操作放在另一个goroutine中执行即可:

1package mainimport ( "fmt")func main() {
2 goo(32)
3}func goo(s int) {
4 counter := make(chan int) go func() {
5 counter <- s
6 }()
7 fmt.Println(<-counter)
8}

或者,将counter设置为一个容量为1的buffered channel:

1counter := make(chan int,1)

这样放完一个数据后send不会阻塞(被recv之前放第二个数据才会阻塞),可以执行到recv操作。

unbuffered channel同步通信示例

下面通过sync.WaitGroup类型来等待程序的结束,分析多个goroutine之间通信时状态的转换。因为创建的channel是unbuffered类型的,所以send和recv都是阻塞的。

 1package mainimport ( "fmt"
2 "sync")// wg用于等待程序执行完成var wg sync.WaitGroupfunc main() {
3 count := make(chan int) // 增加两个待等待的goroutines
4 wg.Add(2)
5 fmt.Println("Start Goroutines") // 激活一个goroutine,label:"Goroutine-1"
6 go printCounts("Goroutine-1", count) // 激活另一个goroutine,label:"Goroutine-2"
7 go printCounts("Goroutine-2", count)
8
9 fmt.Println("Communication of channel begins") // 向channel中发送初始数据
10 count <- 1
11
12 // 等待goroutines都执行完成
13 fmt.Println("Waiting To Finish")
14 wg.Wait()
15 fmt.Println("\nTerminating the Program")
16}func printCounts(label string, count chan int) { // goroutine执行完成时,wg的计数器减1
17 defer wg.Done() for { // 从channel中接收数据
18 // 如果无数据可recv,则goroutine阻塞在此
19 val, ok := <-count if !ok {
20 fmt.Println("Channel was closed:",label) return
21 }
22 fmt.Printf("Count: %d received from %s \n", val, label) if val == 10 {
23 fmt.Printf("Channel Closed from %s \n", label) // Close the channel
24 close(count) return
25 } // 输出接收到的数据后,加1,并重新将其send到channel中
26 val++
27 count <- val
28 }
29}

上面的程序中,激活了两个goroutine,激活这两个goroutine后,向channel中发送一个初始数据值1,然后main goroutine将因为wg.Wait()等待2个goroutine都执行完成而被阻塞。

再看这两个goroutine,这两个goroutine执行完全一样的函数代码,它们都接收count这个channel的数据,但可能是goroutine1先接收到channel中的初始值1,也可能是goroutine2先接收到初始值1。接收到数据后输出值,并在输出后对数据加1,然后将加1后的数据再次send到channel,每次send都会将自己这个goroutine阻塞(因为unbuffered channel),此时另一个goroutine因为等待recv而执行。当加1后发送给channel的数据为10之后,某goroutine将关闭count channel,该goroutine将退出,wg的计数器减1,另一个goroutine因等待recv而阻塞的状态将因为channel的关闭而失败,ok状态码将让该goroutine退出,于是wg的计数器减为0,main goroutine因为wg.Wait()而继续执行后面的代码。

使用for range迭代channel

前面都是在for无限循环中读取channel中的数据,但也可以使用range来迭代channel,它会返回每次迭代过程中所读取的数据,直到channel被关闭。

例如,将上面示例中的printCounts()改为for-range的循环形式。

1func printCounts(label string, count chan int) { defer wg.Done() for val := range count {
2 fmt.Printf("Count: %d received from %s \n", val, label) if val == 10 {
3 fmt.Printf("Channel Closed from %s \n", label) close(count) return
4 }
5 val++
6 count <- val
7 }
8}
多个"管道":输出作为输入

channel是goroutine与goroutine之间通信的基础,一边产生数据放进channel,另一边从channel读取放进来的数据。可以借此实现多个goroutine之间的数据交换,例如goroutine_1->goroutine_2->goroutine_3,就像bash的管道一样,上一个命令的输出可以不断传递给下一个命令的输入,只不过golang借助channel可以在多个goroutine(如函数的执行)之间传,而bash是在命令之间传。

以下是一个示例,第一个函数getRandNum()用于生成随机整数,并将生成的整数放进第一个channel ch1中,第二个函数addRandNum()用于接收ch1中的数据(来自第一个函数),将其输出,然后对接收的值加1后放进第二个channel ch2中,第三个函数printRes接收ch2中的数据并将其输出。

如果将函数认为是Linux的命令,则类似于下面的命令行:ch1相当于第一个管道,ch2相当于第二个管道

1getRandNum | addRandNum | printRes

以下是代码部分:

 1package mainimport ( "fmt"
2 "math/rand"
3 "sync")var wg sync.WaitGroupfunc main() {
4 wg.Add(3) // 创建两个channel
5 ch1 := make(chan int)
6 ch2 := make(chan int) // 3个goroutine并行
7 go getRandNum(ch1) go addRandNum(ch1, ch2) go printRes(ch2)
8
9 wg.Wait()
10}func getRandNum(out chan int) { // defer the wg.Done()
11 defer wg.Done() var random int
12 // 总共生成10个随机数
13 for i := 0; i < 10; i++ { // 生成[0,30)之间的随机整数并放进channel out
14 random = rand.Intn(30)
15 out <- random
16 } close(out)
17}func addRandNum(in,out chan int) { defer wg.Done() for v := range in { // 输出从第一个channel中读取到的数据
18 // 并将值+1后放进第二个channel中
19 fmt.Println("before +1:",v)
20 out <- (v + 1)
21 } close(out)
22}func printRes(in chan int){ defer wg.Done() for v := range in {
23 fmt.Println("after +1:",v)
24 }
25}
指定channel的方向

上面通过两个channel将3个goroutine连接起来,其中起连接作用的是第二个函数addRandNum()。在这个函数中使用了两个channel作为参数:一个channel用于接收、一个channel用于发送。

其实channel类的参数变量可以指定数据流向: ●  

in <-chan int :表示channel in通道只用于接收数据

out chan<- int:表示channel out通道只用于发送数据

55bf572e236b948e6b4a0c07f00365ff85d1b323

只用于接收数据的通道<-chan不可被关闭,因为关闭通道是针对发送数据而言的,表示无数据再需发送。对于recv来说,关闭通道是没有意义的。

所以,上面示例中三个函数可改写为:

1func getRandNum(out chan<- int) {
2 ...
3}func addRandNum(in <-chan int, out chan<- int) {
4 ...
5}func printRes(in <-chan int){
6 ...
7}
buffered channel异步队列请求示例

下面是使用buffered channel实现异步处理请求的示例。

在此示例中:

 ●   有(最多)3个worker,每个worker是一个goroutine,它们有worker ID。
 ●   每个worker都从一个buffered channel中取出待执行的任务,每个任务是一个struct结构,包含了任务id(JobID),当前任务的队列号(ID)以及任务的状态(worker是否执行完成该任务)。
 ●   在main goroutine中将每个任务struct发送到buffered channel中,这个buffered channel的容量为10,也就是最多只允许10个任务进行排队。
 ●   worker每次取出任务后,输出任务号,然后执行任务(run),最后输出任务id已完成。
 ●   每个worker执行任务的方式很简单:随机睡眠0-1秒钟,并将任务标记为完成。

以下是代码部分:

 1package mainimport ( "fmt"
2 "math/rand"
3 "sync"
4 "time")type Task struct {
5 ID int
6 JobID int
7 Status string
8 CreateTime time.Time
9}func (t *Task) run() {
10 sleep := rand.Intn(1000)
11 time.Sleep(time.Duration(sleep) * time.Millisecond)
12 t.Status = "Completed"}var wg sync.WaitGroup// worker的数量,即使用多少goroutine执行任务const workerNum = 3func main() {
13 wg.Add(workerNum) // 创建容量为10的buffered channel
14 taskQueue := make(chan *Task, 10) // 激活goroutine,执行任务
15 for workID := 0; workID <= workerNum; workID++ { go worker(taskQueue, workID)
16 } // 将待执行任务放进buffered channel,共15个任务
17 for i := 1; i <= 15; i++ {
18 taskQueue <- &Task{
19 ID: i,
20 JobID: 100 + i,
21 CreateTime: time.Now(),
22 }
23 } close(taskQueue)
24 wg.Wait()
25}// 从buffered channel中读取任务,并执行任务func worker(in <-chan *Task, workID int) { defer wg.Done() for v := range in {
26 fmt.Printf("Worker%d: recv a request: TaskID:%d, JobID:%d\n", workID, v.ID, v.JobID)
27 v.run()
28 fmt.Printf("Worker%d: Completed for TaskID:%d, JobID:%d\n", workID, v.ID, v.JobID)
29 }
30}
select多路监听

很多时候想要同时操作多个channel,比如从ch1、ch2读数据。Go提供了一个select语句块,它像switch一样工作,里面放一些case语句块,用来轮询每个case语句块的send或recv情况。

select

用法格式示例:

1select { // ch1有数据时,读取到v1变量中
2 case v1 := <-ch1:
3 ... // ch2有数据时,读取到v2变量中
4 case v2 := <-ch2:
5 ... // 所有case都不满足条件时,执行default
6 default:
7 ...
8}

defalut语句是可选的,不允许fall through行为,但允许case语句块为空块。select会被return、break关键字中断。

select的行为模式主要是对channel是否可读进行轮询,但也可以用来向channel发送数据。它的行为如下:

 ●  如果所有的case语句块都被阻塞,则阻塞直到某个语句块可以被处理
 ●  如果多个case同时满足条件,则 随机选择 一个进行处理
 ●  如果存在default且其它case都不满足条件,则执行default。所以default必须要可执行而不能阻塞

需要注意的是,如果在select中执行send操作,则可能会永远被send阻塞。所以,在使用send的时候,应该也使用defalut语句块,保证send不会被阻塞

一般来说,select会放在一个无限循环语句中,一直轮询channel的可读事件。

下面是一个示例,pump1()和pump2()都用于产生数据(一个产生偶数,一个产生奇数),并将数据分别放进ch1和ch2两个通道,suck()则从ch1和ch2中读取数据。然后在无限循环中使用select轮询这两个通道是否可读,最后main goroutine在1秒后强制中断所有goroutine。

 1package mainimport ( "fmt"
2 "time")func main() {
3 ch1 := make(chan int)
4 ch2 := make(chan int) go pump1(ch1) go pump2(ch2) go suck(ch1, ch2)
5 time.Sleep(1e9)
6}func pump1(ch chan int) { for i := 0; i <= 30; i++ { if i%2 == 0 {
7 ch <- i
8 }
9 }
10}func pump2(ch chan int) { for i := 0; i <= 30; i++ { if i%2 == 1 {
11 ch <- i
12 }
13 }
14}func suck(ch1 chan int, ch2 chan int) { for { select { case v := <-ch1:
15 fmt.Printf("Recv on ch1: %d\n", v) case v := <-ch2:
16 fmt.Printf("Recv on ch2: %d\n", v)
17 }
18 }
19}


原文发布时间为:2018-11-20

本文作者:xxx

本文来自云栖社区合作伙伴“Golang语言社区”,了解相关信息可以关注“Golang语言社区”。

相关文章
|
2月前
|
存储 Go 开发者
Go语言中的并发编程与通道(Channel)的深度探索
本文旨在深入探讨Go语言中并发编程的核心概念和实践,特别是通道(Channel)的使用。通过分析Goroutines和Channels的基本工作原理,我们将了解如何在Go语言中高效地实现并行任务处理。本文不仅介绍了基础语法和用法,还深入讨论了高级特性如缓冲通道、选择性接收以及超时控制等,旨在为读者提供一个全面的并发编程视角。
|
2月前
|
安全 Go 数据处理
Go语言中的并发编程:掌握goroutine和channel的艺术####
本文深入探讨了Go语言在并发编程领域的核心概念——goroutine与channel。不同于传统的单线程执行模式,Go通过轻量级的goroutine实现了高效的并发处理,而channel作为goroutines之间通信的桥梁,确保了数据传递的安全性与高效性。文章首先简述了goroutine的基本特性及其创建方法,随后详细解析了channel的类型、操作以及它们如何协同工作以构建健壮的并发应用。此外,还介绍了select语句在多路复用中的应用,以及如何利用WaitGroup等待一组goroutine完成。最后,通过一个实际案例展示了如何在Go中设计并实现一个简单的并发程序,旨在帮助读者理解并掌
|
2月前
|
安全 Go 调度
探索Go语言的并发模型:goroutine与channel
在这个快节奏的技术世界中,Go语言以其简洁的并发模型脱颖而出。本文将带你深入了解Go语言的goroutine和channel,这两个核心特性如何协同工作,以实现高效、简洁的并发编程。
|
2月前
|
Go 调度 开发者
探索Go语言中的并发模式:goroutine与channel
在本文中,我们将深入探讨Go语言中的核心并发特性——goroutine和channel。不同于传统的并发模型,Go语言的并发机制以其简洁性和高效性著称。本文将通过实际代码示例,展示如何利用goroutine实现轻量级的并发执行,以及如何通过channel安全地在goroutine之间传递数据。摘要部分将概述这些概念,并提示读者本文将提供哪些具体的技术洞见。
|
2月前
|
存储 设计模式 安全
Go语言中的并发编程:从入门到精通###
本文深入探讨了Go语言中并发编程的核心概念与实践技巧,旨在帮助读者从理论到实战全面掌握Go的并发机制。不同于传统的技术文章摘要,本部分将通过一系列生动的案例和代码示例,直观展示Go语言如何优雅地处理并发任务,提升程序性能与响应速度。无论你是Go语言初学者还是有一定经验的开发者,都能在本文中找到实用的知识与灵感。 ###
|
2月前
|
Serverless Go
Go语言中的并发编程:从入门到精通
本文将深入探讨Go语言中并发编程的核心概念和实践,包括goroutine、channel以及sync包等。通过实例演示如何利用这些工具实现高效的并发处理,同时避免常见的陷阱和错误。
|
3月前
|
安全 Go 开发者
破译Go语言中的并发模式:从入门到精通
在这篇技术性文章中,我们将跳过常规的摘要模式,直接带你进入Go语言的并发世界。你将不会看到枯燥的介绍,而是一段代码的旅程,从Go的并发基础构建块(goroutine和channel)开始,到高级模式的实践应用,我们共同探索如何高效地使用Go来处理并发任务。准备好,让Go带你飞。
|
3月前
|
安全 Go 调度
探索Go语言的并发之美:goroutine与channel
在这个快节奏的技术时代,Go语言以其简洁的语法和强大的并发能力脱颖而出。本文将带你深入Go语言的并发机制,探索goroutine的轻量级特性和channel的同步通信能力,让你在高并发场景下也能游刃有余。
|
3月前
|
存储 安全 Go
探索Go语言的并发模型:Goroutine与Channel
在Go语言的多核处理器时代,传统并发模型已无法满足高效、低延迟的需求。本文深入探讨Go语言的并发处理机制,包括Goroutine的轻量级线程模型和Channel的通信机制,揭示它们如何共同构建出高效、简洁的并发程序。
|
3月前
|
存储 Go 调度
深入理解Go语言的并发模型:goroutine与channel
在这个快速变化的技术世界中,Go语言以其简洁的并发模型脱颖而出。本文将带你穿越Go语言的并发世界,探索goroutine的轻量级特性和channel的同步机制。摘要部分,我们将用一段对话来揭示Go并发模型的魔力,而不是传统的介绍性文字。