第十章 Channel--第四天 完结

简介: channel用于goroutine之间的通讯. 其内部实现了同步, 确保并发安全, 多个goroutine同时访问, 不需要加锁.

channel用于goroutine之间的通讯. 其内部实现了同步, 确保并发安全, 多个goroutine同时访问, 不需要加锁.


对channel操作行为做如下总结:


1. ch <- : 写入channel


2. ch -> :读出channel


3. clouse : 关闭channel

 

golang 中大部分类型都是值类型, 只有 slice / channel / map 是引用类型

 

一. channel的语法


1. channel的定义方法


var c chan int //定义了一个int类型的chan  此时c=nil


定义一个有初始值的channel.


c := make(chan int)


2. channel是goroutine之间的通讯. 所有有一个goroutine发数据, 就要有一个goroutine收数据


package main
import (
    "fmt"
    "time"
)
func main() {
    c := make(chan int)
    go func() {
        for  {
        // 取出channel
            n := <- c
            fmt.Println(n)
        }
    }()
    // 放入channel
    c <- 1
    c <- 2
    // 让主进程停留1s, 保证channel中的数据全部取出后, 主协程再关闭
    time.Sleep(time.Second)
}


有一个goroutine发数据,必须有一个goroutine收数据, 不然就不能放进去了, 会报异常deadlock


fatal error: all goroutines are asleep - deadlock!


3. 函数是一等公民, channel也是, channel也能作为参数, 也能作为返回值


a. channel作为参数传入


package main
import (
    "fmt"
    "time"
)
// chan也可以作为参数传入. 也可以作为返回值
func getChan(c chan int) {
    for  {
        n := <- c
        fmt.Println(n)
    }
}
func chanDemo () {
    c := make(chan int)
    go getChan(c)
    c <- 1 c <- 2 // 让主进程停留1s, 保证channel中的数据全部取出后, 主进程再关闭  
    time.Sleep(time.Second) 
} 
func main() { 
  chanDemo() 
}


b. channel用作数组


package main
import (
    "fmt"
    "time"
)
// chan也可以作为参数传入. 也可以作为返回值
func worker(i int, c chan int) {
    for  {
        n := <- c
        fmt.Printf("number: %d, worker, %c \n", i, n)
    }
}
func chanDemo () {
    // 开了10个协程, 让10个协程分别取各自的数据
    var channel [10]chan int
    for i := 0; i<10; i++ {
        channel[i] = make(chan int)
        go func(c chan int ) {
            worker(0, c)
        }(channel[i])
    }
    for i := 0; i < 10; i++ {
        channel[i] <- 'a' + i
    }
    for i := 0; i < 10; i++ {
        channel[i] <- 'A' + i
    }
    // 让主进程停留1s, 保证channel中的数据全部取出后, 主进程再关闭
    time.Sleep(time.Second)
}
func main() {
    chanDemo()
}


分配一个管道数组, 里面有10个管道. 向每个管道里放两个数据. 打印.


c. channel作为返回值


package main
import (
    "fmt"
    "time"
)
// chan也可以作为参数传入. 也可以作为返回值
func createWorker(i int) chan int {
    c := make(chan int)
    go func() {
        for  {
            n := <- c
            fmt.Printf("number: %d, worker, %c \n", i, n)
        }
    }()
    return c
}
func chanDemo () {
    // 开了10个协程, 让10个协程分别取各自的数据
    var channel [10]chan int
    for i := 0; i<10; i++ {
        channel[i] = createWorker(i)
    }
    for i := 0; i < 10; i++ {
        channel[i] <- 'a' + i
    }
    for i := 0; i < 10; i++ {
        channel[i] <- 'A' + i
    }
    // 让主进程停留1s, 保证channel中的数据全部取出后, 主进程再关闭
    time.Sleep(time.Second)
}
func main() {
    chanDemo()
}


4 给channel定义方向.


channel可以收数据也可以发数据. 那么我们返回的channel到底是收数据的还是发数据的呢? 我们可以告诉调用者.


package main
import (
    "fmt"
    "time"
)
// chan也可以作为参数传入. 也可以作为返回值
func createWorker(i int) chan<- int {
    c := make(chan int)
    go func() {
        // goroutine里, channel是发数据的. 那么我们定义返回的channel只能是收数据的
        for  {
            n := <- c
            fmt.Printf("number: %d, worker, %c \n", i, n)
        }
    }()
    return c
}
func chanDemo () {
    // 开了10个协程, 让10个协程分别取各自的数据
    var channel [10]chan<- int
    for i := 0; i<10; i++ {
        channel[i] = createWorker(i)
    }
    for i := 0; i < 10; i++ {
        channel[i] <- 'a' + i
    }
    for i := 0; i < 10; i++ {
        channel[i] <- 'A' + i
    }
    // 让主进程停留1s, 保证channel中的数据全部取出后, 主进程再关闭
    time.Sleep(time.Second)
}
func main() {
    chanDemo()
}


5. bufferedChannel 带有缓冲区的channel


我们知道如果创建了一个channel, 往里面放了数据,但是没有人接收, 那么就会deadlock死锁. 也就是必须要有人立刻能够接收走. 这就是要求, 要求立刻被收走. 如果没有, 就报错. 我们可以给他一个缓冲, 让他在几个范围内可以不被立刻收走, 给收数据方一个缓冲的时间


func bufferedChannel() {
    c := make(chan int, 3)
    c <- 1
    c <- 2
    c <- 3
    //c <- 4
}
func main() {
    bufferedChannel()


上面的demo, 定义了带有三个缓冲的channel. 里面3个以内数据不会报deadlock. 超过三个还没有被取走, 才会包deadlock


package main
import (
    "fmt"
    "time"
)
func worker(i int, c chan int) {
    for  {
        n := <- c
        fmt.Printf("number: %d, worker, %d \n", i, n)
    }
}
func bufferedChannel() {
    c := make(chan int, 3)
    go worker(0, c)
    c <- 1
    c <- 2
    c <- 3
    time.Sleep(time.Second)
}
func main() {
    bufferedChannel()
}


6. channel Close()


  • channel 发完了, 我没有数据可以再发了, 是可以close的. channel 的close是有发数据方close.


  • channel的发送方close了, 但是接收方依然是可以接收到数据的. 接收数据的返回值跟channel的类型有关系


  • 接收方可以通过ok判断是否接收到数据了


package main
import (
    "fmt"
    "time"
)
func worker(i int, c chan int) {
    // goroutine里, channel是发数据的. 那么我们定义返回的channel只能是收数据的
    for  {
        n, ok := <- c
        if !ok {
            // 如果没有数据了, 就返回
            break
        }
        fmt.Printf("number: %d, worker, %d \n", i, n)
    }
}
// channel发数据方, 数据发完了, 是可以close的
func channelClose() {
    c := make(chan int, 3)
    go worker(0, c)
    c <- 'a'
    c <- 'b'
    c <- 'c'
    c <- 'd'
    // 调用close 告诉接收方, 数据已经发完了.
    close(c)
    time.Sleep(time.Second)
}
func main() {
    //bufferedChannel()
    channelClose()
}


另一种判断是否有数据的方法, 使用range来判断.


package main
import (
    "fmt"
    "time"
)
func worker(i int, c chan int) {
    // goroutine里, channel是发数据的. 那么我们定义返回的channel只能是收数据的
    for n := range c{
        fmt.Printf("number: %d, worker, %d \n", i, n)
    }
}
// channel发数据方, 数据发完了, 是可以close的
func channelClose() {
    c := make(chan int, 3)
    go worker(0, c)
    c <- 'a'
    c <- 'b'
    c <- 'c'
    c <- 'd'
    // 调用close 告诉接收方, 数据已经发完了.
    close(c)
    time.Sleep(time.Second)
}
func main() {
    //bufferedChannel()
    channelClose()
}


二. Channel的应用: 不要通过共享内存来通信, 通过通信来共享内存


通常, 比如java是通过共享内存来通信. 比如定义一个公共的flag, 这个flag就是共享的一块内存空间. 他的值变了, 通知调用者. 这就是通过共享内存来通信.

 

使用共享内存的话在多线程的场景下为了处理竞态,需要加锁,使用起来比较麻烦。另外使用过多的锁,容易使得程序的代码逻辑坚涩难懂,并且容易使程序死锁,死锁了以后排查问题相当困难,特别是很多锁同时存在的时候。


go语言的channel保证同一个时间只有一个goroutine能够访问里面的数据,为开发者提供了一种优雅简单的工具,所以go原生的做法就是使用channle来通信,而不是使用共享内存来通信。

 

下面来感受一下go的通过通信来共享内存.


三.  使用channel等待任务结束


我们在上面的demo中, 都会有一句话


time.Sleep(time.Second)


让主程序休眠1秒钟.  否则, 协程还没执行完,主程序就退出了


再来分析, 等待的目的是什么呢? 那就是等协程都执行完了,  主程序再退出. 换个思路, 我不用time.sleep, 能不能协程执行完了, 主动告诉主线程, 我执行完了, 退出吧呢?


可以的.


1. 使用通信来共享内存, 用channel实现.


go的原则: 不要通过共享内存来通信, 通信来共享内存. 


go中通信用什么呢? 使用channel


package main
import (
    "fmt"
)
// 定义一个bool类型的chan, 用来和外部通信. 事情做完了, 给外面回复一个done
func work(i int, w worker) {
    for n := range w.c {
        fmt.Printf("number: %d, worker, %c \n", i, n)
        w.done <- true
    }
}
type worker struct {
    c chan int        // 传递字母的管道
    done chan bool        // 标记字母传递完成的管道
}
// chan也可以作为参数传入. 也可以作为返回值
func createWorker(i int, w worker) worker {
    w = worker{
        c : make(chan int),
        done : make(chan bool),
    }
    go work(i, w)
    return w
}
func chanDemo () {
    // 开了10个协程, 有10个工作者
    var wo [10]worker 
  
    // 第一步, 第二步
    for i := 0; i < 10; i++ {
        wo[i] = createWorker(i, wo[i])
    }
    // 第三步
    for i := 0; i < 10; i++ {
        wo[i].c <- 'a' + i
        // 这里有一个通道, 一直等着. 等着取数据.
        <- wo[i].done
    }
    for i := 0; i < 10; i++ {
        wo[i].c <- 'A' + i
        <- wo[i].done
    }
    //time.Sleep(time.Second)
}
func main() {
    //bufferedChannel()
    chanDemo()
}


分析:


1. 定义了一个worker类型, 里面有两个管道. 第一个管道是用来传输字母的. 第二个管道用来标记传送行为是否完成

2. 接下来看chanDemo, chanDemo是一个主goroutine. 在这里:

  第一步: 创建了10个工作者.


  第二步: 开了10个goroutine 用来传输字母. 开goroutine调用createWorker, 然后worker的工作是work


  第三步: for循环为每一个goroutine添加字母. 然后等待.......直到对应的done channel完成, 取出结果. 继续往下执行.


for i := 0; i < 10; i++ {
   wo[i].c <- 'a' + i
   // 这里有一个通道, 一直等着. 等着取数据.
   <- wo[i].done
}


放数据: wo[i].c <- 'a' + i . 然后就等待......等待到什么时候呢? wo[i].done中有数据可以取出.


  这就让主goroutine保证了10个goroutine都执行完以后, 在继续往后执行.


  这解释了为什么goroutine可以让主goroutine等待的原因


输出结果


number: 0, worker, a 
number: 1, worker, b 
number: 2, worker, c 
number: 3, worker, d 
number: 4, worker, e 
number: 5, worker, f 
number: 6, worker, g 
number: 7, worker, h 
number: 8, worker, i 
number: 9, worker, j 
number: 0, worker, A 
number: 1, worker, B 
number: 2, worker, C 
number: 3, worker, D 
number: 4, worker, E 
number: 5, worker, F 
number: 6, worker, G 
number: 7, worker, H 
number: 8, worker, I 
number: 9, worker, J 


这样打印的结果是按照顺序执行的. 一个goroutine执行完了, 才能往另一个goroutine中放数据, 这样效率太低了. 我们换一种方式, 让他不停的打印. 最后一起等待执行完成.


package main
import (
    "fmt"
)
// 定义一个bool类型的chan, 用来和外部通信. 事情做完了, 给外面回复一个done
func work(i int, w worker) {
    for n := range w.c {
        fmt.Printf("number: %d, worker, %c \n", i, n)
        go func() {
            w.done <- true
        }()
    }
}
type worker struct {
    c chan int        // 传递字母的管道
    done chan bool        // 标记字母传递完成的管道
}
// chan也可以作为参数传入. 也可以作为返回值
func createWorker(i int, w worker) worker {
    w = worker{
        c : make(chan int),
        done : make(chan bool),
    }
    go work(i, w)
    return w
}
func chanDemo () {
    // 开了10个协程, 有10个工作者
    var work [10]worker
    for i := 0; i < 10; i++ {
        work[i] = createWorker(i, work[i])
    }
    for i, w := range work {
        w.c <- 'a' + i
    }
    for i, w := range work {
        w.c <- 'A' + i
    }
    for _, w := range work  {
      // 这里有一个通道, 一直等着. 等着取数据.
      // 放了两次, 所以要等待两次都处理完, 在执行后面的结果
        <-w.done
      <-w.done
   }
   //time.Sleep(time.Second)
}
func main() {
    //bufferedChannel()
    chanDemo()
}


这里有两个变化,


1. 我在取结果done的时候, 没有在线等执行完. 而是, 你们去执行吧, 我最后来收结果, 收的顺序也是从1-9的顺序收的. 每个goroutine要有两个结果.


2. 在work具体执行完成的地方, 要定义一个协程. 这样程序才能正常运行, 否则会报deadline异常. 为什么会报异常呢?


  因为, 有两次放数据, 第一次放完了, 往管道done里写了一个数据, 结果没有被收走, 又放了一次..... 所以就发生死锁了.

 

新开一个goroutine, 让他并行的发done. 就可以了


疑问: 为什么定义成goroutine, 他就不会deadline了呢? goroutine还有什么其他的含义?


比如下面这段程序


func main() {
    c := make(chan int)
    c <- 0
}


这么写汇报deadline, 因为管道只有发送方, 没有接收方. 要求必须既有发送方又有接收方

但是这么写就没问题:


func main() {
    c := make(chan int)
    go func() {
        c <- 1
        c <- 2
        c <- 3
        c <- 4
    }()
}


为啥呢?


经过一番研究过终于明白了, 看下面这段程序


func main() {
    //bufferedChannel()
    //channelClose()
    c := make(chan int)
    //c <- 0
    go func() {
        log.Info("11")
        c <- 1
        log.Info("22")
        c <- 2
        log.Info("33")
        c <- 3
        log.Info("44")
        c <- 4
    }()
    //log.Info("1, ", <- c)
    //log.Info("2, ", <- c )
    //log.Info("3, ", <- c )
    //log.Info("4, ", <- c )
    time.Sleep(time.Second)
}


你运行执行一下, 看看结果, 只打印出了11


[2020/02/21 07:09:58] channelDemo.go:83 [Info] 11
Process finished with exit code 0


原来goroutine中的代码一直在等待, 知道有人要收数据了,他才会发


func main() {
    //bufferedChannel()
    //channelClose()
    c := make(chan int)
    //c <- 0
    go func() {
        log.Info("11")
        c <- 1
        log.Info("22")
        c <- 2
        log.Info("33")
        c <- 3
        log.Info("44")
        c <- 4
    }()
    log.Info("1, ", <- c)
    //log.Info("2, ", <- c )
    //log.Info("3, ", <- c )
    //log.Info("4, ", <- c )
    time.Sleep(time.Second)
}


这时的打印结果是


[2020/02/21 07:12:08] channelDemo.go:83 [Info] 11
[2020/02/21 07:12:08] channelDemo.go:85 [Info] 22
[2020/02/21 07:12:08] channelDemo.go:93 [Info] 1, 1


看到了吧, 有人收, goroutine才会发, 收几个, 发几个. 其他的保留待发.

 

2. 使用WaitGroup等待任务的结束


sync. WaitGroup()是系统自带的一个等待任务全部完成的工具


  • Add: 添加的任务个数


  • Wait: 等待全部goroutine完成


  • Done: 某一个goroutine完成


我们第一步: 定义一个WaitGroup


// 开启了一个等待任务, 我们通过waitGroup来等待任务的完成
var wg sync.WaitGroup


第二步: 添加20个任务, 因为我们知道有20个任务就直接添加20就好了,如果不知道,可以在for循环里一个个添加


wg.Add(20)


第三步: 等待20个任务全部结束


wg.Wait()


第四步: 完成了一个任务, 就标记他为done


// 定义一个bool类型的chan, 用来和外部通信. 事情做完了, 给外面回复一个done
func work(i int, w worker) {
    for n := range w.c {
        fmt.Printf("number: %d, worker, %c \n", i, n)
        go func() {
            w.wg.Done()
        }()
    }
}


完整代码如下:


package main
import (
    "fmt"
    "sync"
)
// 定义一个bool类型的chan, 用来和外部通信. 事情做完了, 给外面回复一个done
func work(i int, w worker) {
    for n := range w.c {
        fmt.Printf("number: %d, worker, %c \n", i, n)
        go func() {
            w.wg.Done()
        }()
    }
}
type worker struct {
    c chan int             // 传递字母的管道
    wg *sync.WaitGroup        // 标使用waitgroup来标记同步完成
}
// chan也可以作为参数传入. 也可以作为返回值
func createWorker(i int, wg *sync.WaitGroup) worker {
    w := worker{
        c : make(chan int),
        wg : wg,
    }
    go work(i, w)
    return w
}
func chanDemo () {
    // 开了10个协程, 有10个工作者
    var work [10]worker
    // 开启了一个等待任务, 我们通过waitGroup来等待任务的完成
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        work[i] = createWorker(i, &wg)
    }
    wg.Add(20)
    for i, w := range work {
        w.c <- 'a' + i
    }
    for i, w := range work {
        w.c <- 'A' + i
    }
    wg.Wait()
    //time.Sleep(time.Second)
}
func main() {
    //bufferedChannel()
    chanDemo()
}


3. 在go里面函数是一等公民, 其实等待任务结束的过程中, 或者结束时要做很多事情. 我们要是定义成某一个参数,那就只能接收这个参数了,其他都参数不行.


如何能够达到扩展的目的呢?定义成函数


package main
import (
    "fmt"
    "sync"
)
// 定义一个bool类型的chan, 用来和外部通信. 事情做完了, 给外面回复一个done
func work(i int, w worker) {
    for n := range w.c {
        fmt.Printf("number: %d, worker, %c \n", i, n)
        w.wg()
    }
}
type worker struct {
    c chan int        // 传递字母的管道
    wg func()        // 标记字母传递完成的管道
}
// chan也可以作为参数传入. 也可以作为返回值
func createWorker(i int, wg *sync.WaitGroup) worker {
    w := worker{
        c : make(chan int),
        wg : func() {
            fmt.Println("事情1")
            wg.Done()
            fmt.Println("事情2")
        },
    }
    go work(i, w)
    return w
}
func chanDemo () {
    // 开了10个协程, 有10个工作者
    var work [10]worker
    // 开启了一个等待任务, 我们通过waitGroup来等待任务的完成
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        work[i] = createWorker(i, &wg)
    }
    wg.Add(20)
    for i, w := range work {
        w.c <- 'a' + i
    }
    for i, w := range work {
        w.c <- 'A' + i
    }
    wg.Wait()
    //time.Sleep(time.Second)
}
func main() {
    //bufferedChannel()
    chanDemo()
}


四. 使用channel进行树的遍历


这里使用channel进行树的遍历, 是对channel的一个应用.


之前我们对树遍历后处理使用的是函数.我们也可以用channel


比如: 查找树中最大的值


第一步: 循环遍历获取树, 然后将所有树节点放入到channel中. 返回一个管道


第二步: 从管道中取出树的节点, 进行计算


第三步: 在第一步中, 把所有节点都添加到管道中以后, 一定要close


// 定一个管道, 循环遍历, 把遍历后的节点添加到管道中
func (n *TreeNode) TraveresForChannel() chan *TreeNode {
    out := make(chan *TreeNode)
    go func() {
        n.TraveresFunc(func(node *TreeNode) {
            out <- node
        })
        close(out)
    }()
    return out
}


// 从管道中取出所有节点, 取最大值
    max := 0
    for c := range root.TraveresForChannel() {
        if c.Value > max {
            max = c.Value
        }
    }
    fmt.Println(max)


这个逻辑比较清晰.


五. 用select进行调度


1. 首先,我们来从官方文档看一下有关select的描述:


A "select" statement chooses which of a set of possible send or receive operations will proceed. 
It looks similar to a "switch" statement but with the cases all referring to communication operations.
一个select语句用来选择哪个case中的发送或接收操作可以被立即执行。它类似于switch语句,但是它的case涉及到channel有关的I/O操作。


或者换一种说法,select就是用来监听和channel有关的IO操作,当 IO 操作发生时,触发相应的动作。


比如: 现在有两个channel  A 和B, 我要从A 和 B中取值, 谁先来, 就取出谁. 怎么做呢?


package main
import "fmt"
func main() {
    var A, B chan int
    select {
    case n := <- A:
        fmt.Println("select from A: ", n)
    case n := <- B:
        fmt.Println("select from A:", n)
    default:
        fmt.Println("select from default")
    }
}


这里定义了A和B两个channel, 两个channel都是nil. 下面通过select来选择执行, 会走一个默认的default.


输出结果:


select from default


这里A和B都不会有输出, 所以,就走默认的default, 这是非阻塞的方式输出内容. channel是阻塞的, 如果实现非阻塞的呢? 那就是使用select.....default实现


如果没有default又不断的从A和B中取值, 就会deadlock


package main
import "fmt"
func main() {
    var A, B chan int
    select {
    case n := <-A:
        fmt.Println("select from A: ", n)
    case n := <-B:
        fmt.Println("select from A:", n)
    }
}


微信图片_20220510170448.png


我们生成一个channelA和B, 让A和B不停的产生数据, 然后看看select中是否能取出来


package main
import (
    "fmt"
    "math/rand"
    "time"
)
// 生成A 和 B
func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for  {
            // 随机的在1.5s以内休眠
            time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
            out <- i
            i ++
        }
    }()
    return out
}
func main() {
    var A, B = generator(), generator()
    // 不停的从A和B中收数据
    for  {
        select {
        case n := <-A:
            fmt.Println("select from A: ", n)
        case n := <-B:
            fmt.Println("select from B:", n)
        }
    }
}


数据是在1.5秒以内随机生成的, 所以, 数据是交叉执行的.


谁有数据, 就取出谁


两个都有, 随机取一个.


如果有default会一直执行default


生成的A和B, 取出来的值, 交给工作者, 让工作者打印出来. 工作者是个管道, 他会一直等着, 有工作来了, 就工作, 没有的时候, 等待


package main
import (
    "fmt"
    "math/rand"
    "time"
)
// 生成A 和 B
func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for  {
            // 随机的在1.5s以内休眠
            time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
            out <- i
            i ++
        }
    }()
    return out
}
// 工作者做的具体的工作内容--工作内容, 从管道里不停的取数据
func doWork(i int, c chan int) {
    for cc := range c  {
        fmt.Printf("取出的i:%d, 取出的值时:%d\n", i, cc)
    }
}
// 创建工作者
func createWorker(i int) chan int {
    in := make(chan int)
    // 开始工作
    go doWork(i, in)
    return in
}
func main() {
    var A, B = generator(), generator()
    w := createWorker(0)
    // 不停的从A和B中收数据
    for {
        select {
        case n := <-A:
            w <- n
        case n := <-B:
            w <- n
        }
    }
}


有一个生成数据的管道, 然后从生成数据的管道里取出数据, 将其交给工作者, 让工作者开始工作


goroutine是非抢占式的, 一个通道打开了会一直占有, 直到主动释放. 这里有个问题: 那就是管道取出的一直是第一个的


下面我们在select中既可以向管道中存数据, 又可以取数据

 

package main
import (
    "fmt"
    "math/rand"
    "time"
)
// 生成A 和 B
func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for  {
            // 随机的在1.5s以内休眠
            time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
            out <- i
            i ++
        }
    }()
    return out
}
// 工作者做的具体的工作内容--工作内容, 从管道里不停的取数据
func doWork(i int, c chan int) {
    for cc := range c  {
        fmt.Printf("取出的i:%d, 取出的值时:%d\n", i, cc)
    }
}
// 创建工作者
func createWorker(i int) chan int {
    in := make(chan int)
    // 开始工作
    go doWork(i, in)
    return in
}
func main() {
    var A, B = generator(), generator()
    w := createWorker(0)
    n := 0
    hasValue := false
    // 不停的从A和B中收数据
    for {
        var activeWorker chan int
        if hasValue {
            activeWorker = w
        }
        select {
        case n = <-A:
            hasValue = true
        case n = <-B:
            hasValue = true
        case activeWorker <- n:
            hasValue = false
        }
    }
}


结果:


微信图片_20220510170556.png


依然有问题: 这里生成数据的速度是在1.5秒以内随机的. 如果生成数据的速度快, 消费数据的速度慢. 那么, 就有可能有数据打印不出来. 被跳过了


我们让工作者取数据的速度慢一些


// 工作者做的具体的工作内容--工作内容, 从管道里不停的取数据
func doWork(i int, c chan int) {
    for cc := range c  {
        time.Sleep(time.Second * 5)
        fmt.Printf("取出的i:%d, 取出的值时:%d\n", i, cc)
    }
}


输出结果:


取出的i:0, 取出的值时:0
取出的i:0, 取出的值时:7
取出的i:0, 取出的值时:12
取出的i:0, 取出的值时:20
取出的i:0, 取出的值时:27
取出的i:0, 取出的值时:29
取出的i:0, 取出的值时:36
取出的i:0, 取出的值时:48
取出的i:0, 取出的值时:56


速度一慢下来, 我们发现就丢数据了, 有些数据被跳过了. 那怎么办呢? 我们用一个数组来接收


package main
import (
    "fmt"
    "math/rand"
    "time"
)
// 生成A 和 B
func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for  {
            // 随机的在1.5s以内休眠
            time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
            out <- i
            i ++
        }
    }()
    return out
}
// 工作者做的具体的工作内容--工作内容, 从管道里不停的取数据
func doWork(i int, c chan int) {
    for cc := range c  {
        time.Sleep(time.Second * 5)
        fmt.Printf("取出的i:%d, 取出的值时:%d\n", i, cc)
    }
}
// 创建工作者
func createWorker(i int) chan int {
    in := make(chan int)
    // 开始工作
    go doWork(i, in)
    return in
}
func main() {
    var A, B = generator(), generator()
    w := createWorker(0)
    // 不停的从A和B中收数据
    var values []int
    for {
        var activeWorker chan int
        var activeValue int
        if len(values)  > 0 {
            activeValue = values[0]
            activeWorker = w
        }
        select {
        case n := <-A:        // 生成数据的channel
            values = append(values, n)
        case n := <-B:        // 生成数据的channel
            values = append(values, n)
        case activeWorker <- activeValue:        // 消费数据的channel
            /*
             * 取出saveData中的第一个数据,放入到channel中
             * 将第一个数据删除
             */
            values = values[1:]
        }
    }
}


这样就不会丢数据了, 5秒打印一个数据. 那么我们想知道, 现在管道里积压了多少数据, 怎么处理呢?


做这件事之前,我们先来做一件事, 这个程序是一直运行的, 他不会停, 我们设定一个10秒钟. 让程序10秒以后自动退出


使用time.After(10)


package main
import (
    "fmt"
    "math/rand"
    "time"
)
// 生成A 和 B
func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for  {
            // 随机的在1.5s以内休眠
            time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
            out <- i
            i ++
        }
    }()
    return out
}
// 工作者做的具体的工作内容--工作内容, 从管道里不停的取数据
func doWork(i int, c chan int) {
    for cc := range c  {
        time.Sleep(time.Second * 5)
        fmt.Printf("取出的i:%d, 取出的值时:%d\n", i, cc)
    }
}
// 创建工作者
func createWorker(i int) chan int {
    in := make(chan int)
    // 开始工作
    go doWork(i, in)
    return in
}
func main() {
    var A, B = generator(), generator()
    w := createWorker(0)
    // 不停的从A和B中收数据
    var values []int
    // time.After返回的是一个管道. 也就是10秒以后, 往管道里放一个数据
    tm := time.After(10 * time.Second)
    for {
        var activeWorker chan int
        var activeValue int
        if len(values)  > 0 {
            activeValue = values[0]
            activeWorker = w
        }
        select {
        case n := <-A:        // 生成数据的channel
            values = append(values, n)
        case n := <-B:        // 生成数据的channel
            values = append(values, n)
        case activeWorker <- activeValue:        // 消费数据的channel
            /*
             * 取出saveData中的第一个数据,放入到channel中
             * 将第一个数据删除
             */
            values = values[1:]
        case <- tm: // 如果从管道中取出来值. 那么执行这个case
            fmt.Println("bye")
            return
        }
    }
}


10后自动结束了


接下来的一个需求, 如果数据生成的速度太慢了, 怎么办呢? 我们增加一个判断, 如果数据生成的速度<800毫秒, 打印一个timeout


func main() {
    var A, B = generator(), generator()
    w := createWorker(0)
    // 不停的从A和B中收数据
    var values []int
    // time.After返回的是一个管道. 也就是10秒以后, 往管道里放一个数据
    tm := time.After(10 * time.Second)
    for {
        var activeWorker chan int
        var activeValue int
        if len(values)  > 0 {
            activeValue = values[0]
            activeWorker = w
        }
        select {
        case n := <-A:        // 生成数据的channel
            values = append(values, n)
        case n := <-B:        // 生成数据的channel
            values = append(values, n)
        case activeWorker <- activeValue:        // 消费数据的channel
            /*
             * 取出saveData中的第一个数据,放入到channel中
             * 将第一个数据删除
             */
            values = values[1:]
        case <- time.After(800 * time.Millisecond):  // 连续打印的两个数据之间时间间隔>800毫秒
                fmt.Println("timeout")
        case <- tm: // 如果从管道中取出来值. 那么执行这个case
            fmt.Println("bye")
            return
            /**
             * 上面这两个case有什么区别呢?
             * tm: 整体select运行的时间, 超过10秒退出
             * time.After(800 * time.Millisecond) : 这个是两个数据打印之间的时间间隔>800毫秒
             */
        }
    }
}


打印结果


微信图片_20220510170755.png

2. 定时2.定时器的使用


我们最后来做这件事: 每秒钟打印出已经积压的数据, 使用time.tick(1s) 这是一个定时器. 返回的也是一个channel, 1秒钟往channel中放一个数据


func main() {
    var A, B = generator(), generator()
    w := createWorker(0)
    // 不停的从A和B中收数据
    var values []int
    // time.After返回的是一个管道. 也就是10秒以后, 往管道里放一个数据
    tm := time.After(10 * time.Second)
    // 定义一个定时器, 每秒钟执行定时任务
    tick := time.Tick(time.Second)
    for {
        var activeWorker chan int
        var activeValue int
        if len(values)  > 0 {
            activeValue = values[0]
            activeWorker = w
        }
        select {
        case n := <-A:        // 生成数据的channel
            values = append(values, n)
        case n := <-B:        // 生成数据的channel
            values = append(values, n)
        case activeWorker <- activeValue:        // 消费数据的channel
            /*
             * 取出saveData中的第一个数据,放入到channel中
             * 将第一个数据删除
             */
            values = values[1:]
        case <- time.After(800 * time.Millisecond):  // 连续打印的两个数据之间时间间隔>800毫秒
                fmt.Println("timeout")
        case <- tm: // 如果从管道中取出来值. 那么执行这个case
            fmt.Println("bye")
            return
            /**
             * 上面这两个case有什么区别呢?
             * tm: 整体select运行的时间, 超过10秒退出
             * time.After(800 * time.Millisecond) : 这个是两个数据打印之间的时间间隔>800毫秒
             */
         case <- tick:
            fmt.Println("积压数据个数:", len(values))
        }
    }
}


输出结果:


微信图片_20220510170833.png


感觉select不是很好理解, 还需要在查询资料, 学习一遍


完整代码


package main
import (
    "fmt"
    "math/rand"
    "time"
)
// 生成A 和 B
func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for  {
            // 随机的在1.5s以内休眠
            time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
            out <- i
            i ++
        }
    }()
    return out
}
// 工作者做的具体的工作内容--工作内容, 从管道里不停的取数据
func doWork(i int, c chan int) {
    for cc := range c  {
        time.Sleep(time.Second )
        fmt.Printf("取出的i:%d, 取出的值时:%d\n", i, cc)
    }
}
// 创建工作者
func createWorker(i int) chan int {
    in := make(chan int)
    // 开始工作
    go doWork(i, in)
    return in
}
func main() {
    var A, B = generator(), generator()
    w := createWorker(0)
    // 不停的从A和B中收数据
    var values []int
    // time.After返回的是一个管道. 也就是10秒以后, 往管道里放一个数据
    tm := time.After(10 * time.Second)
    // 定义一个定时器, 每秒钟执行定时任务
    tick := time.Tick(time.Second)
    for {
        var activeWorker chan int
        var activeValue int
        if len(values)  > 0 {
            activeValue = values[0]
            activeWorker = w
        }
        select {
        case n := <-A:        // 生成数据的channel
            values = append(values, n)
        case n := <-B:        // 生成数据的channel
            values = append(values, n)
        case activeWorker <- activeValue:        // 消费数据的channel
            /*
             * 取出saveData中的第一个数据,放入到channel中
             * 将第一个数据删除
             */
            values = values[1:]
        case <- time.After(800 * time.Millisecond):  // 连续打印的两个数据之间时间间隔>800毫秒
                fmt.Println("timeout")
        case <- tm: // 如果从管道中取出来值. 那么执行这个case
            fmt.Println("bye")
            return
            /**
             * 上面这两个case有什么区别呢?
             * tm: 整体select运行的时间, 超过10秒退出
             * time.After(800 * time.Millisecond) : 这个是两个数据打印之间的时间间隔>800毫秒
             */
         case <- tick:
            fmt.Println("积压数据个数:", len(values))
        }
    }
}


go的一个重要特性是channel, go通过通信来共享内存, 而不是通过共享内存来通信. go通过channel实现了这样的一个特性


看上面的demo. 我们定义了两个channel来存入数据, 定义了一个channel来取出数据. 然后对数据进行处理. 在通信的过程中进行了存和取数据的过程. 这就是通过通信来共享内存.

 

六. 传统的同步机制


go建议通过通信来共享内存, 但go本身也是支持传统的同步机制的, 比如锁,


go使用csp模型来实现同步, 建议少使用锁来同步, 锁是通过共享内存来通讯. 但也是可以用的.


下面我们来看如何使用锁.


1. Mutex


我们定义两个同时存数据的场景, 一个取数据的场景. 并发进行, 先不加锁, 看看有什么问题?


package main
import (
    "fmt"
    "time"
)
// 定义一个自定义的类型AtomicInt
type AtomicInt int
// 增加方法
func (a *AtomicInt) increase() {
    *a ++
}
// 取数据的方法
func (a *AtomicInt) get() int{
    return int(*a)
}
func main() {
    var a AtomicInt
    // 存数据
    a.increase()
    // 定义了一个单独的协程去存数据, 这样就有两个协程同时存数据
    go func() {
        a.increase()
    }()
    time.Sleep(time.Second)
    // 取数据---这时取的是1还是2 呢?
    fmt.Println(a.get())
}


这个程序短期看运行不会有什么问题, 但这里确实是有冲突的. 我们用-race来看一下


go run -race AutomicInt.go

微信图片_20220510171015.png


可以看出发生冲突了, 什么冲突呢? 在写数据之前, 发现有个地方在读数据. 阿欧....这可不好. 有同步问题.


下面我们使用Mutex来加锁


package main
import (
    "fmt"
    "sync"
    "time"
)
// 定义一个自定义的类型AtomicInt
type AtomicInt struct {
    value int
    lock sync.Mutex
}
// 增加方法
func (a *AtomicInt) increase() {
    a.lock.Lock()
    defer a.lock.Unlock()
    a.value ++
}
// 取数据的方法
func (a *AtomicInt) get() int{
    a.lock.Lock()
    defer a.lock.Unlock()
    return int(a.value)
}
func main() {
    var a AtomicInt
    // 存数据
    a.increase()
    // 定义了一个单独的协程去存数据, 这样就有两个协程同时存数据
    go func() {
        a.increase()
    }()
    time.Sleep(time.Second)
    // 取数据---这时取的是1还是2 呢?
    fmt.Println(a.get())
}


其实这了展示了lock的用法还是挺简单的. 在结构体里,就是对结构体中的数据进行加锁. 加在哪里, 就是对谁加的.

 

这一章学完了, 感受是: 挺难的, 尤其是select. 又要通过管道读数据, 又要通过管道写数据, 挺费事. 再学一遍. 眼睛透彻了

 

 

参考资料:


1. https://www.cnblogs.com/tobycnblogs/p/9935465.html


2.


相关文章
|
3天前
|
存储 SQL 前端开发
🚀经常发文章的你是否想过定时发布是咋实现的?🚀
🚀经常发文章的你是否想过定时发布是咋实现的?🚀
|
4月前
答知识星球朋友疑问:执行 ABAP 代码出现超时的原因,背后的理论和解决方案试读版
答知识星球朋友疑问:执行 ABAP 代码出现超时的原因,背后的理论和解决方案试读版
17 0
|
5月前
|
Go
一图胜千言,帮你搞懂Go面试中常问的channel问题!
一图胜千言,帮你搞懂Go面试中常问的channel问题!
|
5月前
|
测试技术
【测试平台系列】第一章 手撸压力机(十)-定义场景
上一章,咱们对http请求进行了一些优化,本章节我们将组成场景去运行。首先场景就是一连串的http接口的请求,我们使用list(列表)来组装成一个场景
【测试平台系列】第一章 手撸压力机(十)-定义场景
|
5月前
|
存储 JSON 搜索推荐
【测试平台系列】第一章 手撸压力机(十二)-初步实现提取功能
上一章节,我们主要实现了基础的并发测试场景的能力。本章节,我们实现一下,如何对响应进行提取,使用正则/json对响应信息提取,并赋值给我们定义的变量。
|
5月前
|
测试技术
【测试平台系列】第一章 手撸压力机(九)- 封装函数
将我们的一些代码封装到函数和方法中,这样我们看来代码可读性更好。如果发现bug,也可以更好的进行追踪。
|
5月前
|
测试技术
【测试平台系列】第一章 手撸压力机(八)- 实现testobject接口
上一章中我们已经启动了一个/engine/run/testObject/接口,但是我们还没有具体的实现该接口,本章我们就来实现一下该接口。
【测试平台系列】第一章 手撸压力机(八)- 实现testobject接口
|
5月前
|
测试技术 Go
【测试平台系列】第一章 手撸压力机(七)- 使用gin
今天,我们使用gin框架将压力机做成一个web服务后端。 我们引入gin框架:
【测试平台系列】第一章 手撸压力机(七)- 使用gin
|
前端开发
前端知识案例-队列
前端知识案例-队列
38 0
前端知识案例-队列
|
前端开发 JavaScript C语言
带你读书之“红宝书”:第十章 函数⑤
带你读书之“红宝书”:第十章 函数⑤
51 0
带你读书之“红宝书”:第十章 函数⑤