golang 流式处理pipeline的最佳实践

简介: golang 流式处理pipeline的最佳实践

大丈夫当雄飞,安能雌伏?——《后汉书》


1. 前言



pipeline或多或少大家都知道,就是一种流式处理或者批处理技术的抽象,重点解决数据的传输,操作以及传回。redis pipeline就是利用这个思想实现批处理操作,大家下去自行研究。


2. pipeline的最佳实践



我们实现一个需求,输入一个数组[1,2,3,4],经过各种stage(一个stage就是输入,操作和回传),输出[6,10,14,18]


2.1 stage-1 每个数✖️2


multiply := func(
  done <- chan interface{},
  intStream <- chan int,
  multiplier int,
 ) <- chan int {
  multipliedStream := make(chan int)
  go func() {
   defer close(multipliedStream)
   for i := range intStream {
    select {
    case <- done:
     return
    case multipliedStream <- i*multiplier:
    }
   }
  }()
  return multipliedStream
 }


这里就是把输入的每个数、乘以2 在放到chan中 最后将chan返回。我们可以看到stage:输入是intStream,操作是乘以2,传回是另一个chan,核心就是输入是chan,处理,回传是另一个chan


2.2 stage-2 每个数+1


add := func(
  done <- chan interface{},
  intStream <- chan int,
  additive int,
 ) <- chan int {
  addedStream := make(chan int)
  go func() {
   defer close(addedStream)
   for i := range intStream {
    select {
    case <- done:
     return
    case addedStream <- i+additive:
    }
   }
  }()
  return addedStream
 }

逻辑同上

2.3 把输入数组转换成chan序列

gen := func(
  done <- chan interface{},
  integers ...int,
  ) <- chan int {
  intStream := make(chan int)
  go func() {
   defer close(intStream)
   for _, i:= range integers {
    select {
     case <-done:
      return
     case intStream <- i:
    }
   }
  }()
  return intStream
 }


这个函数核心就是将输入的数组转为chan的序列,供下游pipeline处理。


3. 运行



done := make(chan interface{})
 defer close(done)
 intStram := gen(done, 1,2,3,4)
 pipeline := multiply(done, add(done, multiply(done, intStram, 2), 1), 2)
 for v := range pipeline {
  fmt.Println(v)
 }


输出:

6
10
14
18


4. 小结



大家看到了吗,我们可以调整add和multi的位置,或者各种组合,这种场景最适合我们业务在获取到一批数据的时候还会经过其他stage处理返回,可能经历过很多stage之后才会得出结果,这种场景不妨试试pipeline技术,效果非常惊人!


和责任链模式有区别哦,责任链数据一致传递,中途不会返回的哦


5. 关注公众号



  微信公众号:堆栈future

相关文章
|
3月前
|
存储 程序员 编译器
Golang 中的字符串:常见错误和最佳实践
Golang 中的字符串:常见错误和最佳实践
|
4月前
|
中间件 测试技术 Go
Golang中的错误处理最佳实践
【7月更文挑战第10天】在Golang中,错误处理是核心且重要的。最佳实践包括:定义明确的错误类型,使用错误链(如`%w`包装错误),始终检查错误(避免忽略),谨慎使用`panic`和`recover`,利用多值返回处理错误,标准化错误处理逻辑,并确保测试错误处理代码。这些做法有助于构建健壮和可维护的程序。
|
6月前
|
存储 测试技术 Go
Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
KisFlow项目源码位于&lt;https://github.com/aceld/kis-flow,初始阶段涉及项目构建和基础模块定义。首先在GitHub创建仓库,克隆到本地。项目目录包括`common/`, `example/`, `function/`, `conn/`, `config/`, `flow/`, 和 `kis/`。`go.mod`用于包管理,`KisLogger`接口定义了日志功能,提供不同级别的日志方法。默认日志对象`kisDefaultLogger`打印到标准输出。
666 7
Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
|
6月前
|
设计模式 Go 调度
Golang深入浅出之-Go语言中的并发模式:Pipeline、Worker Pool等
【5月更文挑战第1天】Go语言并发模拟能力强大,Pipeline和Worker Pool是常用设计模式。Pipeline通过多阶段处理实现高效并行,常见问题包括数据竞争和死锁,可借助通道和`select`避免。Worker Pool控制并发数,防止资源消耗,需注意任务分配不均和goroutine泄露,使用缓冲通道和`sync.WaitGroup`解决。理解和实践这些模式是提升Go并发性能的关键。
79 2
golang中的接口和Sort方法的接口最佳实践
golang中的接口和Sort方法的接口最佳实践
|
Go Java 应用服务中间件
golang自定义路由控制实现(二)-流式注册接口以及支持RESTFUL
    先简单回顾一下在上一篇的文章中,上一篇我主要是结合了数组和Map完成路由映射,数组的大小为8,下标为0的代表Get方法,以此类推,而数组的值则是Map,键为URL,值则是我们编写对应的接口。但是上篇的设计仍存在着不足,主要是无法很好的面向RESTFUL设计,同时,我希望还能够希望一个功能,类似于SpringMVC中,可以将@Controller作用于类上,代表着该类下所有接口的一个起始路径。
1467 0
|
2月前
|
Go
Golang语言之管道channel快速入门篇
这篇文章是关于Go语言中管道(channel)的快速入门教程,涵盖了管道的基本使用、有缓冲和无缓冲管道的区别、管道的关闭、遍历、协程和管道的协同工作、单向通道的使用以及select多路复用的详细案例和解释。
117 4
Golang语言之管道channel快速入门篇
|
2月前
|
Go
Golang语言文件操作快速入门篇
这篇文章是关于Go语言文件操作快速入门的教程,涵盖了文件的读取、写入、复制操作以及使用标准库中的ioutil、bufio、os等包进行文件操作的详细案例。
67 4
Golang语言文件操作快速入门篇
|
2月前
|
Go
Golang语言之gRPC程序设计示例
这篇文章是关于Golang语言使用gRPC进行程序设计的详细教程,涵盖了RPC协议的介绍、gRPC环境的搭建、Protocol Buffers的使用、gRPC服务的编写和通信示例。
104 3
Golang语言之gRPC程序设计示例