channel 实战应用,这篇就够了!

简介: channel 实战应用,这篇就够了!

由 channel 引发的血案


上面那篇文章漏了一个我觉得很关键的知识点,并且我们还经常在上面犯错误。即使是那些牛逼的开源项目,也有过类似 bug。

这两天有幸协助自己部门面试,担任一面面试官的时候,两个面试者我都有问个这个问题。但是让我奇怪的是,一个只提了一种情况,另一个直接回答我没遇到过,一度让我怀疑人生。

我的问题是:channel 的哪些操作会引发 panic?

1.关闭一个 nil 值 channel 会引发 panic。

package main
func main() {
  var ch chan struct{}
  close(ch)
}

1668494025846.jpg


2.关闭一个已关闭的 channel 会引发 panic。

package main
func main() {
  ch := make(chan struct{})
  close(ch)
  close(ch)
}

1668494052056.jpg


3.向一个已关闭的 channel 发送数据。

package main
func main() {
  ch := make(chan struct{})
  close(ch)
  ch <- struct{}{}
}

1668494076929.jpg

以上三种 channel 操作会引发 panic。

你可能会说,我咋么会犯这么愚蠢的错误。这只是一个很简单的例子,实际项目是很复杂的,一不小心,你就会忘了自己曾在哪一个 g 里关闭过 channel。

如果你对某块代码没有安全感,相信我,就算它中午不出事,早晚也得出事。


channel 的一些应用


  • 信号通知
  • 超时控制
  • 生产消费模型
  • 数据传递
  • 控制并发数
  • 互斥锁
  • one million......


1.信号通知


经常会有这样的场景,当信息收集完成,通知下游开始计算数据。

package main
import (
  "fmt"
  "time"
)
func main() {
  isOver := make(chan struct{})
  go func() {
    collectMsg(isOver)
  }()
  <-isOver
  calculateMsg()
}
// 采集
func collectMsg(isOver chan struct{}) {
  time.Sleep(500 * time.Millisecond)
  fmt.Println("完成采集工具")
  isOver <- struct{}{}
}
// 计算
func calculateMsg() {
  fmt.Println("开始进行数据分析")
}

如果只是单纯的使用通知操作,那么类型就使用 struct{}。因为空结构体在 go 中是不占用内存空间的,不信你看。

package main
import (
"fmt"
"unsafe"
)
func main() {
  res := struct{}{}
  fmt.Println("占用空间:", unsafe.Sizeof(res))
}
//占用空间: 0


2.执行任务超时


我们在做任务处理的时候,并不能保证任务的处理时间,通常会加上一些超时控制做异常的处理。

package main
import (
  "fmt"
  "time"
)
func main() {
  select {
  case <-doWork():
    fmt.Println("任务结束")
  case <-time.After(1 * time.Second):
    fmt.Println("任务处理超时")
  }
}
func doWork() <-chan struct{} {
  ch := make(chan struct{})
  go func() {
    // 任务处理耗时
    time.Sleep(2 * time.Second)
  }()
  return ch
}


3.生产消费模型


生产者只需要关注生产,而不用去理会消费者的消费行为,更不用关心消费者是否执行完毕。而消费者只关心消费任务,而不需要关注如何生产。

package main
import (
  "fmt"
  "time"
)
func main() {
  ch := make(chan int, 10)
  go consumer(ch)
  go producer(ch)
  time.Sleep(3 * time.Second)
}
// 一个生产者
func producer(ch chan int) {
  for i := 0; i < 10; i++ {
    ch <- i
  }
  close(ch)
}
// 消费者
func consumer(task <-chan int) {
  for i := 0; i < 5; i++ {
    // 5个消费者
    go func(id int) {
      for {
        item, ok := <-task
        // 如果等于false 说明通道已关闭
        if !ok {
          return
        }
        fmt.Printf("消费者:%d,消费了:%d\n", id, item)
        // 给别人一点机会不会吃亏
        time.Sleep(50 * time.Millisecond)
      }
    }(i)
  }
}


4.数据传递


极客上一道有意思的题,假设有4个 goroutine,编号为1,2,3,4。每秒钟会有一个 goroutine 打印出它自己的编号。现在让你写一个程序,要求输出的编号总是按照1,2,3,4这样的顺序打印。类似下图,

1668494268346.jpg

package main
import (
  "fmt"
  "time"
)
type token struct{}
func main() {
  num := 4
  var chs []chan token
  // 4 个work
  for i := 0; i < num; i++ {
    chs = append(chs, make(chan token))
  }
  for j := 0; j < num; j++ {
    go worker(j, chs[j], chs[(j+1)%num])
  }
  // 先把令牌交给第一个
  chs[0] <- struct{}{}
  select {}
}
func worker(id int, ch chan token, next chan token) {
  for {
    // 对应work 取得令牌
    token := <-ch
    fmt.Println(id + 1)
    time.Sleep(1 * time.Second)
    // 传递给下一个
    next <- token
  }
}


5.控制并发数


我经常会写一些脚本,在凌晨的时候对内或者对外拉取数据,但是如果不对并发请求加以控制,往往会导致 groutine 泛滥,进而打满 CPU 资源。往往不能控制的东西意味着不好的事情将要发生。对于我们来说,可以通过 channel 来控制并发数。

package main
import (
  "fmt"
  "time"
)
func main() {
  limit := make(chan struct{}, 10)
  jobCount := 100
  for i := 0; i < jobCount; i++ {
    go func(index int) {
      limit <- struct{}{}
      job(index)
      <-limit
    }(i)
  }
  time.Sleep(20 * time.Second)
}
func job(index int) {
  // 耗时任务
  time.Sleep(1 * time.Second)
  fmt.Printf("任务:%d已完成\n", index)
}

当然了,sync.waitGroup 也可以.

package main
import (
  "fmt"
  "sync"
  "time"
)
func main() {
  var wg sync.WaitGroup
  jobCount := 100
  limit := 10
  for i := 0; i <= jobCount; i += limit {
    for j := 0; j < i; j++ {
      wg.Add(1)
      go func(item int) {
        defer wg.Done()
        job(item)
      }(j)
    }
    wg.Wait()
  }
}
func job(index int) {
  // 耗时任务
  time.Sleep(1 * time.Second)
  fmt.Printf("任务:%d已完成\n", index)
}


6.互斥锁


我们也可以通过 channel 实现一个小小的互斥锁。通过设置一个缓冲区为1的通道,如果成功地往通道发送数据,说明拿到锁,否则锁被别人拿了,等待他人解锁。

package main
import (
  "fmt"
  "time"
)
type ticket struct{}
type Mutex struct {
  ch chan ticket
}
// 创建一个缓冲区为1的通道作
func newMutex() *Mutex {
  return &Mutex{ch: make(chan ticket, 1)}
}
// 谁能往缓冲区为1的通道放入数据,谁就获取了锁
func (m *Mutex) Lock() {
  m.ch <- struct{}{}
}
// 解锁就把数据取出
func (m *Mutex) unLock() {
  select {
  case <-m.ch:
  default:
    panic("已经解锁了")
  }
}
func main() {
  mutex := newMutex()
  go func() {
    // 如果是1先拿到锁,那么2就要等1秒才能拿到锁
    mutex.Lock()
    fmt.Println("任务1拿到锁了")
    time.Sleep(1 * time.Second)
    mutex.unLock()
  }()
  go func() {
    mutex.Lock()
    // 如果是2拿先到锁,那么1就要等2秒才能拿到锁
    fmt.Println("任务2拿到锁了")
    time.Sleep(2 * time.Second)
    mutex.unLock()
  }()
  time.Sleep(500 * time.Millisecond)
  // 用了一点小手段这里最后才能拿到锁
  mutex.Lock()
  mutex.unLock()
  close(mutex.ch)
}

到这里,这篇文章已经尾声了。当然我只是列举了部分 channel 的应用场景。你完全可以发挥自己的想象,在实际工作中,构建更完美且贴近生产的设计。

相关文章
|
前端开发 网络协议 Dubbo
超详细Netty入门,看这篇就够了!
本文主要讲述Netty框架的一些特性以及重要组件,希望看完之后能对Netty框架有一个比较直观的感受,希望能帮助读者快速入门Netty,减少一些弯路。
82430 30
超详细Netty入门,看这篇就够了!
|
1月前
|
消息中间件 存储 Kafka
RocketMQ 工作原理图解,看这篇就够了!
本文详细解析了 RocketMQ 的核心架构、消息领域模型、关键特性和应用场景,帮助深入理解消息中间件的工作原理。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
RocketMQ 工作原理图解,看这篇就够了!
|
4月前
|
存储 缓存 移动开发
EaselJS 源码分析系列--第二篇
EaselJS 源码分析系列--第二篇
|
4月前
|
移动开发 前端开发 JavaScript
EaselJS 源码分析系列--第一篇
EaselJS 源码分析系列--第一篇
|
5月前
|
人工智能 机器人
RPA是什么?为啥要学习RPA,看这篇就够了
RPA是什么?为啥要学习RPA,看这篇就够了
312 2
|
存储 自然语言处理 算法
C++进阶之一篇文章教会你什么是map和set(上)
序列式容器和关联式容器 序列式容器: 序列式容器是一组用于存储数据的容器,其中的数据按照它们在容器中的位置进行存储和访问。序列式容器提供了对元素的线性访问和操作,其主要特点包括:
|
存储 数据可视化 安全
彻底搞懂channel原理(一)
彻底搞懂channel原理(一)
197 0
彻底搞懂channel原理(一)
|
存储 安全
彻底搞懂channel原理(二)
彻底搞懂channel原理(二)
261 0
彻底搞懂channel原理(二)
|
消息中间件 存储 Go
彻底搞懂channel原理(三)
彻底搞懂channel原理(三)
240 0
彻底搞懂channel原理(三)
|
消息中间件 Dubbo Java
Rocket 第二章内容介绍|学习笔记
快速学习 Rocket 第二章内容介绍
132 0