作者:尹正杰
版权声明:原创作品,谢绝转载!否则将追究法律责任。
一.channel的基本使用
1.channel概述
共享内存交换数据弊端:
- 单纯地将函数并发执行是没有意义的。函数与函数间需要交换数据才能体现并发执行函数的意义。
- 虽然可以使用共享内存进行数据交换,但是共享内存在不同的goroutine中容易发生竞态问题。
- 为了保证数据交换的正确性,很多并发模型中必须使用互斥量对内存进行加锁,这种做法势必造成性能问题。
Go语言采用的并发模型是CSP(Communicating Sequential Processes),提倡通过"通信共享内存"而不是通过"共享内存实现通信"。
管道(channel)特质介绍:
- 管道(channel)是一种特殊的类型,本质就是一个类似"队列"的数据结构;
- 管道(channel)像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序;
- 管道(channel)自身是线程安全,多协程访问时,不需要加锁;
- 管道(channel)是有类型的,一个string的管道只能存放string类型数据;
- 管道(channel)是可以让一个goroutine发送特定值到另一个goroutine的通信机制;
channel是Go语言中一种特有的类型。声明通道类型变量的格式如下:
var 变量名称 chan 元素类型
其中:
chan:是关键字
元素类型:是指通道中传递元素的类型
举几个例子:
var ch1 chan int // 声明一个传递整型的通道
var ch2 chan bool // 声明一个传递布尔型的通道
var ch3 chan []int // 声明一个传递int切片的通道
2.管道入门案例
2.1 有缓冲管道和无缓冲管道概述
声明的通道类型变量需要使用内置的make函数初始化之后才能使用。具体格式如下:
make(chan 元素类型, [缓冲大小])
需要传递两个参数:
- 第一个参数
是channel存储的数据类型,比如"chan int"表示存储的是int类型。
- 第二个参数:
指的是channel的容量大小,channel的缓冲大小是可选的。若不指定默认值为0。
若容量为0则无法从中写入数据,如果非要写会报错"fatal error: all goroutines are asleep - deadlock!"
有缓冲管道特点:
- 1.只要管道的容量大于零,那么该管道就属于有缓冲的管道,管道的容量表示管道中最大能存放的元素数量。
- 2.当管道内已有元素数达到最大容量后,再向管道执行发送操作就会阻塞,除非有从管道执行接收操作。
无缓冲管道特点:
- 1.无缓冲的管道又称为阻塞的管道,单来说就是无缓冲的管道必须有至少一个接收方才能发送成功。
- 2.无缓冲的管道只有在有接收方能够接收值的时候才能发送成功,否则会一直处于等待发送的阶段。
- 3.同理,如果对一个无缓冲管道执行接收操作时,没有任何向管道中发送值的操作那么也会导致接收操作阻塞。
- 4.使用无缓冲管道进行通信将导致发送和接收的goroutine同步化,因此,无缓冲管道也被称为同步管道。
2.2 有缓冲管道
package main
import (
"fmt"
)
func main() {
// 1.定义一个int类型的管道
var intChan chan int
// 未初始化的通道类型变量其默认零值是nil。
fmt.Printf("intChan = %v\n", intChan)
//2.通过make初始化有缓冲管道,管道可以存放3个int类型的数据
intChan = make(chan int, 3)
// 3.管道是引用类型
fmt.Printf("intChan的值: %v\n", intChan)
// 4.向管道存放数据
intChan <- 100
intChan <- 200
// 存储的数据不能大于管道channel容量。
// intChan <- 300
// intChan <- 400
// 5.查看管道的长度
fmt.Printf("intChan的实际大小: %d, 容量是: %d\n", len(intChan), cap(intChan))
// 6.在管道中读取数据
data01 := <-intChan
data02 := <-intChan
// 注意,在没有使用协程的情况下,如果管道的数据已经全部取出,那么再取就会报错"fatal error: all goroutines are asleep - deadlock!"。
// data03 := <-intChan
fmt.Printf("data = %d\n", data01)
fmt.Printf("intChan的实际大小: %d, 容量是: %d\n", len(intChan), cap(intChan))
fmt.Printf("data = %d\n", data02)
// fmt.Printf("data = %d\n", data03)
}
2.3 无缓冲管道
package main
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
func Consumer(ch chan bool) {
// 在没有拿到数据之前,下面这行代码会一直处于阻塞状态哟!
data := <-ch
fmt.Println("in Consumer ... ", data)
wg.Done()
}
func main() {
// 定义无缓冲管道
ch := make(chan bool)
wg.Add(1)
go Consumer(ch)
fmt.Printf("已经开启Consumer协程...ch的容量为:%d,长度为:%d\n", cap(ch), len(ch))
// 故意阻塞住主线程3秒。
time.Sleep(time.Second * 3)
// 阻塞一段时间后,主线程再尝试发送数据,只要主线程发送数据,Consumer协程就可以不阻塞啦~
ch <- true
fmt.Printf("main函数执行结束...ch的容量为:%d,长度为:%d\n", cap(ch), len(ch))
wg.Wait()
}
3.管道的关闭
3.1 管道关闭操作结果概述
上面的表格中总结了对不同状态下的通道执行相应操作的结果。
一个通道值是可以被垃圾回收掉的。通道通常由发送方执行关闭操作,并且只有在接收方明确等待通道关闭的信号时才需要执行关闭操作。
它和关闭文件不一样,通常在结束操作之后关闭文件是必须要做的,但关闭通道不是必须的。
关闭后的通道有以下特点:
- 对一个关闭的通道再发送值就会导致 panic。
- 对一个关闭的通道进行接收会一直获取值直到通道为空。
- 对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值。
- 关闭一个已经关闭的通道会导致panic。
3.2 管道关闭案例
package main
import "fmt"
func main() {
var strChan chan string
strChan = make(chan string, 5)
strChan <- "JasonYin"
strChan <- "https://www.cnblogs.com/yinzhengjie"
// 我们通过调用内置的close函数来关闭通道。
close(strChan)
// 管道不能重复关闭,否则会报错"panic: close of closed channel"
// close(strChan)
// 关闭管道后就不能写入数据了,会报错"panic: send on closed channel"
// strChan <- "尹正杰"
// 管道关闭后,是可以读取数据的
data := <-strChan
fmt.Printf("data = %v\n", data)
}
3.3 判断通道是否关闭
package main
import (
"fmt"
)
func Consumer(ch chan string) {
for {
/*
对一个通道执行接收操作时支持使用如下多返回值模式。
- value:
从通道中取出的值,如果通道被关闭则返回对应类型的零值。
- ok:
通道ch关闭时返回 false,否则返回true。
*/
value, ok := <-ch
if !ok {
fmt.Println("通道已关闭")
break
}
fmt.Printf("value: %#v ok: %#v\n", value, ok)
}
}
func main() {
ch := make(chan string, 2)
ch <- "尹正杰"
ch <- "https://www.cnblogs.com/yinzhengjie"
close(ch)
Consumer(ch)
}
4.管道的遍历
package main
import "fmt"
func listChannel(ch chan bool) {
/*
管道的遍历:
管道支持for-range的方式进行遍历。
管道遍历注意三个细节:
- 1.如果管道没有关闭,则会出现deadlock的错误;
- 2.如果管道已经关闭,则会正常遍历数据,遍历完后,就会退出遍历;
- 3.目前Go语言中并没有提供一个不对通道进行读取操作就能判断通道是否被关闭的方法,不能简单的通过len(ch)操作来判断通道是否被关闭;
*/
for value := range ch {
fmt.Printf("value = %t\n", value)
}
}
func main() {
var boolChan chan bool
boolChan = make(chan bool, 6)
for i := 0; i < 2; i++ {
boolChan <- true
boolChan <- false
boolChan <- true
}
// 关闭管道,避免在遍历时出错
close(boolChan)
listChannel(boolChan)
}
5.协程和管道协同工作案例
package main
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
// Producer 生产者产生数据
func Producer(strChan chan string) {
defer wg.Done()
for i := 1; i <= 20; i++ {
data := fmt.Sprintf("饺子id = %d", i)
strChan <- data
fmt.Printf("Duang~新包的%s\n", data)
time.Sleep(time.Second)
}
// 管道关闭,避免遍历时报错
close(strChan)
}
// Consumer 消费者消费数据
func Consumer(strChan chan string) {
defer wg.Done()
// 遍历数据
for value := range strChan {
fmt.Printf("吧唧,吃了%s\n", value)
}
}
func main() {
strChan := make(chan string, 20)
wg.Add(2)
// 开启读和写的协程
go Producer(strChan)
go Consumer(strChan)
wg.Wait()
fmt.Println("吃饺子程序运行结束...")
}
6.声明只读只写的管道
6.1 单向通道概述
在某些场景下我们可能会将管道作为参数在多个任务函数间进行传递。
我们会选择在不同的任务函数中对管道的使用进行限制,比如限制管道在某个函数中只能执行发送或只能执行接收操作。
6.2 声明单向管道
package main
import (
"fmt"
)
func main() {
// 1.默认情况下,管道是双向的,即可读可写
var intChan01 chan int
intChan01 = make(chan int, 3) // 初始化数据
intChan01 <- 100 // 写入数据到管道
data01 := <-intChan01 // 从管道中读取数据
fmt.Printf("data01 = %d\n", data01)
// 2.声明单向的只写管道,即只能写不能读
var intChan02 chan<- int
intChan02 = make(chan int, 3) // 初始化数据
intChan02 <- 200 // 写入数据到管道
// data02 := <-intChan02 // 无法从管道中读取数据,会报错"invalid operation: cannot receive from send-only channel intChan02 (variable of type chan<- int)"
// fmt.Printf("data02 = %d\n", data02)
// 3.声明单向的只读管道,即只能读不能写
var intChan03 <-chan int
// intChan03 <- 200 // 无法写入数据到管道,会报错"invalid operation: cannot send to receive-only channel intChan03 (variable of type <-chan int)"
if intChan03 != nil {
data03 := <-intChan03
fmt.Printf("data03 = %d\n", data03)
}
}
6.3 单向管道作为参数传递
package main
import (
"fmt"
)
// Producer 返回一个只写的管道,并持续将符合条件的数据发送至返回的管道中,数据发送完成后会将返回的管道关闭
func Producer() <-chan int {
ch := make(chan int, 2)
// 创建一个新的goroutine执行发送数据的任务
go func() {
// 将10以内的偶数返回
for i := 0; i <= 10; i++ {
if i%2 == 0 {
ch <- i
}
}
// 任务完成后关闭通道,避免另外的协程在遍历时出错。
close(ch)
}()
return ch
}
// Consumer参数为只读管道
func Consumer(ch <-chan int) int {
sum := 0
for value := range ch {
fmt.Printf("in Consumer ... %d + %d \n", sum, value)
sum += value
}
return sum
}
func main() {
channel := Producer()
result := Consumer(channel)
fmt.Printf("in main ... \tresult = %d\n", result)
}
7.管道的阻塞
7.1 什么时候会出现管道的阻塞
- 1.只读不写的情况下会阻塞管道
- 2.只读不写的情况下会阻塞管道
7.2 只写不读缓冲管道满就会阻塞
package main
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
// Producer 生产者产生数据
func Producer(strChan chan string) {
defer wg.Done()
for i := 1; i <= 20; i++ {
data := fmt.Sprintf("饺子id = %d", i)
strChan <- data
fmt.Printf("Duang~新包的%s\n", data)
time.Sleep(time.Second)
}
// 管道关闭,避免遍历时报错
close(strChan)
}
// Consumer 消费者消费数据
func Consumer(strChan chan string) {
defer wg.Done()
// 遍历数据
for value := range strChan {
fmt.Printf("吧唧,吃了%s\n", value)
}
}
func main() {
// 缓冲管道大小为5
strChan := make(chan string, 5)
wg.Add(1)
// 只写不读缓冲管道满就会阻塞,报错: "fatal error: all goroutines are asleep - deadlock!"
go Producer(strChan)
// go Consumer(strChan)
wg.Wait()
fmt.Println("吃饺子程序运行结束...")
}
7.3 只读不写的情况下会阻塞管道
package main
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
// Producer 生产者产生数据
func Producer(strChan chan string) {
defer wg.Done()
for i := 1; i <= 20; i++ {
data := fmt.Sprintf("饺子id = %d", i)
strChan <- data
fmt.Printf("Duang~新包的%s\n", data)
time.Sleep(time.Second)
}
// 管道关闭,避免遍历时报错
close(strChan)
}
// Consumer 消费者消费数据
func Consumer(strChan chan string) {
defer wg.Done()
// 遍历数据
for value := range strChan {
fmt.Printf("吧唧,吃了%s\n", value)
}
}
func main() {
// 缓冲管道大小为5
strChan := make(chan string, 5)
wg.Add(1)
// 只读不写缓冲管道就会阻塞,报错: "fatal error: all goroutines are asleep - deadlock!"
// go Producer(strChan)
go Consumer(strChan)
wg.Wait()
fmt.Println("吃饺子程序运行结束...")
}
二.select多路复用
1.select语句概述
在某些场景下我们可能需要同时从多个管道接收数据。
Go语言内置了select关键字,解决多个管道的选择问题,也可以叫做多路复用,可以从多个管道中随机公平地选择一个来执行。
select语句具有以下特点:
- 1.case后面必须进行IO操作,不能是等值,随机去选择一个IO操作;
- 2.default防止select被阻塞住,如果没有case符合,则走default分支;
2.select案例
package main
import (
"fmt"
"time"
)
func ProducerInt(ch chan int) {
ch <- 10
for i := 100; i <= 110; i++ {
time.Sleep(time.Second * 3)
ch <- i
}
}
func ProducerStr(ch chan string) {
for i := 0; i < 10; i++ {
time.Sleep(time.Second * 2)
ch <- fmt.Sprintf("golang00%d\n", i)
}
}
func main() {
// 定义一个int管道
intChan := make(chan int, 1)
// 定义一个string管道
strChan := make(chan string, 1)
go ProducerInt(intChan)
go ProducerStr(strChan)
for count := 1; count <= 60; count++ {
// select和前面学习的switch语句运行逻辑很相似
select {
// case后面必须进行IO操作,不能是等值,随机去选择一个IO操作,多个case如果都符合,则会随机选择一个去执行。
case value := <-intChan:
fmt.Printf("intChan = %v\n", value)
case value := <-intChan:
fmt.Printf("strChan = %v\n", value)
// 如果所有的case语句都不符合,则走默认的default语句哟~
default:
fmt.Printf("程序已经运行%d秒\n", count)
time.Sleep(time.Second * 1)
}
}
}