大丈夫当雄飞,安能雌伏?——《后汉书》
1. 前言
pipeline或多或少大家都知道,就是一种流式处理或者批处理技术的抽象,重点解决数据的传输,操作以及传回。redis pipeline
就是利用这个思想实现批处理操作,大家下去自行研究。
2. pipeline的最佳实践
我们实现一个需求,输入一个数组[1,2,3,4],经过各种stage(一个stage就是输入,操作和回传),输出[6,10,14,18]
2.1 stage-1 每个数✖️2
multiply := func( done <- chan interface{}, intStream <- chan int, multiplier int, ) <- chan int { multipliedStream := make(chan int) go func() { defer close(multipliedStream) for i := range intStream { select { case <- done: return case multipliedStream <- i*multiplier: } } }() return multipliedStream }
这里就是把输入的每个数、乘以2 在放到chan中 最后将chan返回。我们可以看到stage:输入是intStream,操作是乘以2,传回是另一个chan,核心就是输入是chan,处理,回传是另一个chan。
2.2 stage-2 每个数+1
add := func( done <- chan interface{}, intStream <- chan int, additive int, ) <- chan int { addedStream := make(chan int) go func() { defer close(addedStream) for i := range intStream { select { case <- done: return case addedStream <- i+additive: } } }() return addedStream }
逻辑同上
2.3 把输入数组转换成chan序列
gen := func( done <- chan interface{}, integers ...int, ) <- chan int { intStream := make(chan int) go func() { defer close(intStream) for _, i:= range integers { select { case <-done: return case intStream <- i: } } }() return intStream }
这个函数核心就是将输入的数组转为chan的序列,供下游pipeline处理。
3. 运行
done := make(chan interface{}) defer close(done) intStram := gen(done, 1,2,3,4) pipeline := multiply(done, add(done, multiply(done, intStram, 2), 1), 2) for v := range pipeline { fmt.Println(v) }
输出:
6 10 14 18
4. 小结
大家看到了吗,我们可以调整add和multi的位置,或者各种组合,这种场景最适合我们业务在获取到一批数据的时候还会经过其他stage处理返回,可能经历过很多stage之后才会得出结果,这种场景不妨试试pipeline技术,效果非常惊人!
和责任链模式有区别哦,责任链数据一致传递,中途不会返回的哦
5. 关注公众号
微信公众号:堆栈future