使用Channel的一些业务场景

简介: 使用Channel的一些业务场景

使用Channel的一些业务场景


首先需要明确的就是,发送方才知道什么时候关闭 channel ,这个是比较符合逻辑的。

我们需要知道哪些情况会使 channel 发生 panic


  • 关闭一个 nil 值会引发
  • 关闭一个已经关闭的 channel 会引发
  • 向一个已经关闭的 channel 发送数据会引发


常见应用场景

  • 信号通知
  • 超时控制
  • 生产消费模型
  • 数据传递
  • 控制并发数
  • 互斥锁
  • one million……


信号通知


如果只是单纯的使用通知操作,那么类型就使用 struct{} 。因为空结构体在 go 中是不占用内存空间的


package main
import (
  "fmt"
  "time"
)
func main() {
  isOver := make(chan struct{})
  go func() {
    collectMsg(isOver)
  }()
  <-isOver
  calculateMsg()
}
// 采集
func collectMsg(isOver chan struct{}) {
  time.Sleep(500 * time.Millisecond)
  fmt.Println("完成采集工具")
  isOver <- struct{}{}
}
// 计算
func calculateMsg() {
  fmt.Println("开始进行数据分析")
}

超时控制


<-time.After(1 * time.Second) 是过了指定时间后返回一个 channelselect case 是哪一个 channel 先有反应就先处理,所以可以做到一个超时控制的作用


func main() {
  select {
  case <-doWork():
    fmt.Println("任务结束")
  case <-time.After(1 * time.Second):
    fmt.Println("任务处理超时")
  }
}
func doWork() <-chan struct{} {
  ch := make(chan struct{})
  go func() {
    // 任务处理耗时
    time.Sleep(2 * time.Second)
    close(ch)
  }()
  return ch
}

消费者模型


这个就不多说了,一个生产,一个消费


数据传递

type token struct{}
func main() {
  num := 4
  var chs []chan token
  // 4 个work
  for i := 0; i < num; i++ {
    chs = append(chs, make(chan token))
  }
  for j := 0; j < num; j++ {
    go worker(j, chs[j], chs[(j+1)%num])
  }
  // 先把令牌交给第一个
  chs[0] <- struct{}{}
  select {}
}
func worker(id int, ch chan token, next chan token) {
  for {
    // 对应work 取得令牌
    token := <-ch
    fmt.Println(id + 1)
    time.Sleep(1 * time.Second)
    // 传递给下一个
    next <- token
  }
}


控制并发数


func main() {
  limit := make(chan struct{}, 10)
  jobCount := 100
  for i := 0; i < jobCount; i++ {
    go func(index int) {
      limit <- struct{}{}
      job(index)
      <-limit
    }(i)
  }
  time.Sleep(20 * time.Second)
}
func job(index int) {
  // 耗时任务
  time.Sleep(1 * time.Second)
  fmt.Printf("任务:%d已完成n", index)
}


我们也可以通过 channel 实现一个小小的互斥锁。通过设置一个缓冲区为1的通道,如果成功地往通道发送数据,说明拿到锁,否则锁被别人拿了,等待他人解锁。


type ticket struct{}
type Mutex struct {
  ch chan ticket
}
// 创建一个缓冲区为1的通道作
func newMutex() *Mutex {
  return &Mutex{ch: make(chan ticket, 1)}
}
// 谁能往缓冲区为1的通道放入数据,谁就获取了锁
func (m *Mutex) Lock() {
  m.ch <- struct{}{}
}
// 解锁就把数据取出
func (m *Mutex) unLock() {
  select {
  case <-m.ch:
  default:
    panic("已经解锁了")
  }
}
func main() {
  mutex := newMutex()
  go func() {
    // 如果是1先拿到锁,那么2就要等1秒才能拿到锁
    mutex.Lock()
    fmt.Println("任务1拿到锁了")
    time.Sleep(1 * time.Second)
    mutex.unLock()
  }()
  go func() {
    mutex.Lock()
    // 如果是2拿先到锁,那么1就要等2秒才能拿到锁
    fmt.Println("任务2拿到锁了")
    time.Sleep(2 * time.Second)
    mutex.unLock()
  }()
  time.Sleep(500 * time.Millisecond)
  // 用了一点小手段这里最后才能拿到锁
  mutex.Lock()
  mutex.unLock()
  close(mutex.ch)
}


目录
相关文章
|
6月前
|
消息中间件 存储 数据库
RocketMQ 流存储解析:面向流场景的关键特性与典型案例
RocketMQ 流存储解析:面向流场景的关键特性与典型案例
88601 9
|
3月前
|
Python
【金融量化】通道突破策略之布林带策略(Bollinger Band )、肯特纳通道策略(Keltner Channel)、唐奇安通道策略(Donchian)原理简介
本文介绍了三种金融量化分析中的通道突破策略:布林带策略(Bollinger Band)、肯特纳通道策略(Keltner Channel)和唐奇安通道策略(Donchian Channel),并提供了每种策略的原理和Python实现代码。
162 2
|
1月前
|
消息中间件 分布式计算 大数据
大数据-75 Kafka 高级特性 稳定性-一致性保证 LogAndOffset(LEO) HightWatermark(HW) 水位/水印
大数据-75 Kafka 高级特性 稳定性-一致性保证 LogAndOffset(LEO) HightWatermark(HW) 水位/水印
43 3
|
2月前
|
消息中间件 NoSQL 中间件
19)消息队列的终极解决方案 Stream
19)消息队列的终极解决方案 Stream
42 0
|
6月前
|
消息中间件 存储 数据库
深度剖析 RocketMQ 5.0,流存储:流场景的诉求是什么?
本文将从使用的角度出发,来更详细的展示一下流存储的场景,看看它和业务消息的场景有哪些区别。 RocketMQ 5.0 面向流存储的场景,提供了哪些特性。再结合两个数据集成的案例,来帮助大家了解流存储的用法。
3505 2
|
消息中间件 存储 运维
基于 RocketMQ Connect 构建数据流转处理平台
基于 RocketMQ Connect 构建数据流转处理平台
基于 RocketMQ Connect 构建数据流转处理平台
|
消息中间件 存储 运维
阿里云消息队列 RocketMQ 5.0 全新升级:消息、事件、流融合处理平台
RocketMQ5.0 的发布标志着阿里云消息从消息领域正式迈向了“消息、事件、流”场景大融合的新局面。未来阿里云消息产品的演进也将继续围绕消息、事件、流核心场景而开展。
1161 1
阿里云消息队列 RocketMQ 5.0 全新升级:消息、事件、流融合处理平台
|
消息中间件 Kafka
kafka学习六-生产延迟操作
这里思考问题,什么时候会用到延迟组件,同时哪些时候会用到延迟组件,同时为什么要用延迟组件? 从kafkaApi中,我们可以知道具体的逻辑实现都是在这里实现的: DelayedOperation调用过程 同时基于时间轮进行时间过期的检测操作。 也即从这里我们可以看到DelayedProduce是协助副本管理器完成相应的延迟操作的,而副本管理器则主要是完成将生产者发送的消息写入到leader副本、管理follwer副本与leader副本之间的同步以及副本角色之间的转换。在上面的生产延迟中,我们可以看到在消息写入leader副本时需要DelayedProdue的协助。同时我们也可以看到:当生产请求的
221 0
kafka学习六-生产延迟操作
|
消息中间件 存储 运维
让数据流动起来,RocketMQ Connect 技术架构解析
本文介绍了 RocketMQ Connect 的概念,然后讲解了 RocketMQ Connect 的实现原理,对服务发现,配置同步,位点同步,负载均衡都有了初步的介绍,接着以 MySqlSourceConnector 为例讲解了如何自己实现一个 Connector,最后对 Connect API 和生态做了一些介绍,提供了一些 RocketMQ Connect 相关的上手教程。
让数据流动起来,RocketMQ Connect 技术架构解析
|
设计模式 缓存 网络协议
Netty4 Channel 概述(通道篇)
Netty4 Channel 概述(通道篇)
Netty4 Channel 概述(通道篇)