一篇文章带你吃透Go语言的Atomic和Channel--实战方法

简介: 一篇文章带你吃透Go语言的Atomic和Channel--实战方法

一篇文章带你吃透Go语言的Atomic和Channel–实战方法

Atomic 要保证原子操作,一定要使用这几种方法

我们在学习 MutexRWMutex 等并发原语的实现时,你可以看到,最底层是通过 atomic 包中的一些原子操作来实现的

你可能会说,这些并发原语已经可以应对大多数的并发场景了,为啥还要学习原子操作呢?其实,这是因为,在很多场景中,使用并发原语实现起来比较复杂,而原子操作可以帮助我们更轻松地实现底层的优化。

之所以叫原子操作,是因为一个原子在执行的时候,其它线程不会看到执行一半的操作结果。在其它线程看来,原子操作要么执行完了,要么还没有执行,就像一个最小的粒子 - 原子一样,不可分割。

原子操作的基础知识

CPU 提供了基础的原子操作,不过,不同架构的系统的原子操作是不一样的。

对于单处理器单核系统来说,如果一个操作是由一个 CPU 指令来实现的,那么它就是原子操作,比如它的 XCHG 和 INC 等指令。如果操作是基于多条指令来实现的,那么,执行的

过程中可能会被中断,并执行上下文切换,这样的话,原子性的保证就被打破了,因为这个时候,操作可能只执行了一半

在多处理器多核系统中,原子操作的实现就比较复杂了。

由于 cache 的存在,单个核上的单个指令进行原子操作的时候,你要确保其它处理器或者核不访问此原子操作的地址,或者是确保其它处理器或者核总是访问原子操作之后的最新

的值。x86 架构中提供了指令前缀 LOCKLOCK 保证了指令(比如 LOCK CMPXCHG op1、op2)不会受其它处理器或 CPU 核的影响,有些指令(比如 XCHG)本身就提供

Lock 的机制。不同的 CPU 架构提供的原子操作指令的方式也是不同的,比如对于多核的 MIPSARM,提供了 LL/SC(Load Link/Store Conditional)指令,可以帮助实现原子

操作(ARMLL/SC 指令 LDREXSTREX)。

因为不同的 CPU 架构甚至不同的版本提供的原子操作的指令是不同的,所以,要用一种编程语言实现支持不同架构的原子操作是相当有难度的。不过,还好这些都不需要你操心,

因为 Go 提供了一个通用的原子操作的 API,将更底层的不同的架构下的实现封装成 atomic

关于 atomic,还有一个地方你一定要记住,atomic 操作的对象是一个地址,你需要把可寻址的变量的地址作为参数传递给方法,而不是把变量的值传递给方法。

基本方法

  • Add 方法就是给第一个参数地址中的值增加一个 delta 值 可以是负值
  • CAS 这个方法会比较当前 addr 地址里的值是不是 old,如果不等于 old,就返回 false;如果等于 old,就把此地址的值替换成 new 值,返回 true。这就相当于“判断相等才替换”。
  • Swap 直接交换值
  • Load 方法会取出 addr 地址中的值
  • Store 方法会把一个值存入到指定的 addr 地址中,即使在多处理器、多核、有 CPU cache 的情况下,这个操作也能保证 Store 是一个原子操作。
  • Value 类型 它可以原子地存取对象类型,但也只能存取,不能 CASSwap,常常用在配置变更等场景中。
type Config struct {
  NodeName string
  Addr     string
  Count    int32
}
func loadNewConfig() Config {
  return Config{NodeName: "北京", Addr: "10.77.95.27", Count: rand.Int31()}
}
func main() {
  var config atomic.Value
  config.Store(loadNewConfig())
  var cond = sync.NewCond(&sync.Mutex{}) // 设置新的config
  go func() {
    for {
      time.Sleep(time.Duration(5+rand.Int63n(5)) * time.Second)
      config.Store(loadNewConfig())
      cond.Broadcast() // 通知等待着配置已变更
    }
  }()
  go func() {
    for {
      cond.L.Lock()
      cond.Wait()                 // 等待变更信号
      c := config.Load().(Config) // 读取新的配置
      fmt.Printf("new config: %+v\n", c)
      cond.L.Unlock()
    }
  }()
  select {}
}

第三方库的扩展

  • uber go/atomic,它定义和封装了几种与常见类型相对应的原子操作类型,这些类型提供了原子操作的方法。这些类型包括 Bool、Duration、Error、Float64、Int32、Int64、String、Uint32、Uint64 等。

atomic 原子操作的应用场景

举个例子:假设你想在程序中使用一个标志(flag,比如一个 bool 类型的变量),来标识一个定时任务是否已经启动执行了,你会怎么做呢?

我们先来看看加锁的方法。如果使用 MutexRWMutex,在读取和设置这个标志的时候加锁,是可以做到互斥的、保证同一时刻只有一个定时任务在执行的,所以使用 Mutex 或者 RWMutex 是一种解决方案。

其实,这个场景中的问题不涉及到对资源复杂的竞争逻辑,只是会并发地读写这个标志,这类场景就适合使用 atomic 的原子操作。具体怎么做呢?

你可以使用一个 uint32 类型的变量,如果这个变量的值是 0,就标识没有任务在执行,如果它的值是 1,就标识已经有任务在完成了。你看,是不是很简单呢?

再来看一个例子。假设你在开发应用程序的时候,需要从配置服务器中读取一个节点的配置信息。而且,在这个节点的配置发生变更的时候,你需要重新从配置服务器中拉取一份

新的配置并更新。你的程序中可能有多个 goroutine 都依赖这份配置,涉及到对这个配置对象的并发读写,你可以使用读写锁实现对配置对象的保护。在大部分情况下,你也可以

利用 atomic 实现配置对象的更新和加载。

使用 atomic 实现 Lock-Free queue

// lock-free的queue
type LKQueue struct {
  head unsafe.Pointer
  tail unsafe.Pointer
} 
// 通过链表实现,这个数据结构代表链表中的节点
type node struct {
  value interface{}
  next  unsafe.Pointer
}
func NewLKQueue() *LKQueue {
  n := unsafe.Pointer(&node{})
  return &LKQueue{head: n, tail: n}
} // 入队
func (q *LKQueue) Enqueue(v interface{}) {
  n := &node{value: v}
  for {
    tail := load(&q.tail)
    next := load(&tail.next)
    if tail == load(&q.tail) { // 尾还是尾
      if next == nil { // 还没有新数据入队
        if cas(&tail.next, next, n) { //增加到队尾
          cas(&q.tail, tail, n) //入队成功,移动尾巴指针
          return
        }
      } else { // 已有新数据加到队列后面,需要移动尾指针
        cas(&q.tail, tail, next)
      }
    }
  }
}
// 出队,没有元素则返回nil
func (q *LKQueue) Dequeue() interface{} {
  for {
    head := load(&q.head)
    tail := load(&q.tail)
    next := load(&head.next)
    if head == load(&q.head) { // head还是那个head
      if head == tail { // head和tail一样
        if next == nil { // 说明是空队列
          return nil
        } // 只是尾指针还没有调整,尝试调整它指向下一个
        cas(&q.tail, tail, next)
      } else { // 读取出队的数据
        v := next.value // 既然要出队了,头指针移动到下一个
        if cas(&q.head, head, next) {
          return v // Dequeue is done. return
        }
      }
    }
  }
}
// 将unsafe.Pointer原子加载转换成node
func load(p *unsafe.Pointer) (n *node) { return (*node)(atomic.LoadPointer(p)) }
// 封装CAS,避免直接将*node转换成unsafe.Pointer
func cas(p *unsafe.Pointer, old, new *node) (ok bool) {
  return atomic.CompareAndSwapPointer(p, unsafe.Pointer(old), unsafe.Pointer(new))
}

Channel:另辟蹊径,解决并发问题

应用场景

  • 数据交流:当作并发的 buffer 或者 queue,解决生产者 - 消费者问题。多个 goroutine 可以并发当作生产者(Producer)和消费者(Consumer)。
  • 数据传递:一个 goroutine 将数据交给另一个 goroutine,相当于把数据的拥有权 (引用) 托付出去。
  • 信号通知:一个 goroutine 可以将信号 (closing、closed、data ready 等) 传递给另一个或者另一组 goroutine
  • 任务编排:可以让一组 goroutine 按照一定的顺序并发或者串行的执行,这就是编排的功能。
  • 锁:利用 Channel 也可以实现互斥锁的机制。

基本了解

通过 make,我们可以初始化一个 chan,未初始化的 chan 的零值是 nil。你可以设置它的容量,比如下面的 chan 的容量是 9527,我们把这样的 chan 叫做 buffered chan;如果

没有设置,它的容量是 0,我们把这样的 chan 叫做 unbuffered chan

如果 chan 中还有数据,那么,从这个 chan 接收数据的时候就不会阻塞,如果 chan 还未满(“满”指达到其容量),给它发送数据也不会阻塞,否则就会阻塞。unbuffered chan 只有读写都准备好之后才不会阻塞,

这也是很多使用 unbuffered chan 时的常见 Bug

  • chan 接收是有两个值的,第一个是通道中的值,第二个表示是否正常接收到那些值,如果第二个值为 falsechan 已经被被 close 而且 chan 中没有缓存的数据了
  • 发送和接收 都可用于 selectcase 语句
  • chan 还可以用作于 for-range 中,可以使用取出的值,也可以什么都不作 用于清空数据

使用 Channel 容易犯的错误

  1. closenilchan
  2. send 已经 closechan
  3. close 已经 closechan

什么时候选择使用 Channel

  1. 共享资源的并发访问使用传统并发原语;
  2. 复杂的任务编排和消息传递使用 Channel
  3. 消息通知机制使用 Channel,除非只想 signal 一个 goroutine,才使用 Cond
  4. 简单等待所有任务的完成用 WaitGroup,也有 Channel 的推崇者用 Channel,都可以;
  5. 需要和 Select 语句结合,使用 Channel
  6. 需要和超时配合时,使用 ChannelContext

Channel:透过代码看典型的应用模式

使用反射操作 Channel

select 语句可以处理 chansendrecv,可是,如果要处理 100chan 呢?一万个 chan 呢?

通过 reflect.Select 函数,你可以将一组运行时的 case clause 传入,当作参数执行。Goselect 是伪随机的,它可以在执行的 case 中随机选择一个 case,并把选择的这个 case

的索引(chosen)返回,如果没有可用的 case 返回,会返回一个 bool 类型的返回值,这个返回值用来表示是否有 case 成功被选择。如果是 recv case,还会返回接收的元素。

func main() {
  var ch1 = make(chan int, 10)
  var ch2 = make(chan int, 10)      // 创建SelectCase
  var cases = createCases(ch1, ch2) // 执行10次select
  for i := 0; i < 10; i++ {
    chosen, recv, ok := reflect.Select(cases)
    if recv.IsValid() { // recv case
      fmt.Println("recv:", cases[chosen].Dir, recv, ok)
    } else { // send case
      fmt.Println("send:", cases[chosen].Dir, ok)
    }
  }
}
func createCases(chs ...chan int) []reflect.SelectCase {
  var cases []reflect.SelectCase // 创建recv case
  for _, ch := range chs {
    cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)})
  } // 创建send case
  for i, ch := range chs {
    v := reflect.ValueOf(i)
    cases = append(cases, reflect.SelectCase{Dir: reflect.SelectSend, Chan: reflect.ValueOf(ch), Send: v})
  }
  return cases
}

典型的应用场景

消息交流

chan 的内部实现看,它是以一个循环队列的方式存放数据,所以,它有时候也会被当成线程安全的队列和 buffer 使用。一个 goroutine 可以安全地往 Channel 中塞数据,另

外一个 goroutine 可以安全地从 Channel 中读取数据,goroutine 就可以安全地实现信息交流了。

第一个例子是 worker 池的例子。他们将用户的请求放在一个 chan Job 中,这个 chan Job 就相当于一个待处理任务队列。除此之外,还有一个 chan chan Job

队列,用来存放可以处理任务的 worker 的缓存队列。 dispatcher 会把待处理任务队列中的任务放到一个可用的缓存队列中,worker 会一直处理它的缓存队列。通过使用 Channel,实现了一个 worker 池的任务处理中心,并且解耦

了前端 HTTP 请求处理和后端任务处理的逻辑。

数据传递

有 4 个 goroutine,编号为 1、2、3、4。每秒钟会有一个 goroutine 打印出它自己的编号,要求你编写程序,让输出的编号总是按照 1、2、3、4、1、2、3、4……这个顺序打印出来。

type Token struct{}
func newWorker(id int, ch chan Token, nextCh chan Token) {
  for {
    token := <-ch         // 取得令牌
    fmt.Println((id + 1)) // id从1开始
    time.Sleep(time.Second)
    nextCh <- token
  }
}
func main() {
  chs := []chan Token{make(chan Token), make(chan Token), make(chan Token), make(chan Token)} // 创建4个worker
  for i := 0; i < 4; i++ {
    go newWorker(i, chs[i], chs[(i+1)%4])
  }
  //首先把令牌交给第一个worker
  chs[0] <- struct{}{}
  select {}
}

这类场景有一个特点,就是当前持有数据的 goroutine 都有一个信箱,信箱使用 chan 实现,goroutine 只需要关注自己的信箱中的数据,处理完毕后,就把结果发送到下一家的信箱中。

信号通知

chan 类型有这样一个特点:chan 如果为空,那么,receiver 接收数据的时候就会阻塞等待,直到 chan 被关闭或者有新的数据到来。利用这个机制,我们可以实现 wait/notify 的设计模式。

除了正常的业务处理时的 wait/notify,我们经常碰到的一个场景,就是程序关闭的时候,我们需要在退出之前做一些清理(doCleanup 方法)的动作。这个时候,我们经常要使用 chan

比如,使用 chan 实现程序的 graceful shutdown,在退出之前执行一些连接关闭、文件 close、缓存落盘等一些动作。

func main() {
    go func() {
        ...... // 执行业务处理
    }()                                                                  // 处理CTRL+C等中断信号
    termChan := make(chan os.Signal)
    signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM) <- termChan // 执行退出之前的清理动作
    doCleanup()
    fmt.Println("优雅退出")
}

有时候,doCleanup 可能是一个很耗时的操作,比如十几分钟才能完成,如果程序退出需要等待这么长时间,用户是不能接受的,所以,在实践中,我们需要设置一个最长的等待

时间。只要超过了这个时间,程序就不再等待,可以直接退出。所以,退出的时候分为两个阶段:

  1. closing,代表程序退出,但是清理工作还没做;
  2. closed,代表清理工作已经做完。
func main() {
  var closing = make(chan struct{})
  var closed = make(chan struct{})
  go func() { // 模拟业务处理
    for {
      select {
      case <-closing:
        return
      default: // ....... 业务计算
        time.Sleep(100 * time.Millisecond)
      }
    }
  }()            // 处理CTRL+C等中断信号
  termChan := make(chan os.Signal)
  signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM) <- termChan
  close(closing) // 执行退出之前的清理动作
  go doCleanup(closed)
  select {
  case <-closed:
  case <-time.After(time.Second):
    fmt.Println("清理超时,不等了")
  }
  fmt.Println("优雅退出")
}
func doCleanup(closed chan struct{}) { time.Sleep((time.Minute))
  close(closed)

使用 chan 也可以实现互斥锁。

要想使用 chan 实现互斥锁,至少有两种方式。一种方式是先初始化一个 capacity 等于 1 的 Channel,然后再放入一个元素。这个元素就代表锁,谁取得了这个元素,就相当于获

取了这把锁。另一种方式是,先初始化一个 capacity 等于 1 的 Channel,它的“空槽”代表锁,谁能成功地把元素发送到这个 Channel,谁就获取了这把锁。

这是使用 Channel 实现锁的两种不同实现方式,我重点介绍下第一种。理解了这种实现方式,第二种方式也就很容易掌握了,我就不多说了。

// 使用chan实现互斥锁
type Mutex struct {
  ch chan struct{}
}
// 使用锁需要初始化
func NewMutex() *Mutex {
  mu := &Mutex{make(chan struct{}, 1)}
  mu.ch <- struct{}{}
  return mu
}
// 请求锁,直到获取到
func (m *Mutex) Lock() {
  <-m.ch
}
// 解锁
func (m *Mutex) Unlock() {
  select {
  case m.ch <- struct{}{}:
  default:
    panic("unlock of unlocked mutex")
  }
} // 尝试获取锁
func (m *Mutex) TryLock() bool {
  select {
  case <-m.ch:
    return true
  default:
  }
  return false
} // 加入一个超时的设置
func (m *Mutex) LockTimeout(timeout time.Duration) bool {
  timer := time.NewTimer(timeout)
  select {
  case <-m.ch:
    timer.Stop()
    return true
  case <-timer.C:
  }
  return false
}
// 锁是否已被持有
func (m *Mutex) IsLocked() bool {
  return len(m.ch) == 0
}
func main() {
  m := NewMutex()
  ok := m.TryLock()
  fmt.Printf("locked v %v\n", ok)
  ok = m.TryLock()
  fmt.Printf("locked %v\n", ok)
}

你可以用 buffer 等于 1 的 chan 实现互斥锁,在初始化这个锁的时候往 Channel 中先塞入一个元素,谁把这个元素取走,谁就获取了这把锁,把元素放回去,就是释放了锁。元

素在放回到 chan 之前,不会有 goroutine 能从 chan 中取出元素的,这就保证了互斥性。

在这段代码中,还有一点需要我们注意下:利用 select+chan 的方式,很容易实现 TryLockTimeout 的功能。具体来说就是,在 select 语句中,我们可以使用 default

TryLock,使用一个 Timer 来实现 Timeout 的功能。

任务编排

我们学习了 WaitGroup,我们可以利用它实现等待模式:启动一组 goroutine 执行任务,然后等待这些任务都完成。其实,我们也可以使用 chan 实现

WaitGroup 的功能。我来重点介绍下多个 chan 的编排方式,总共 5 种,分别是 Or-Done 模式扇入模式扇出模式StreamMap-Reduce

Or-Done 模式

我们会使用“信号通知”实现某个任务执行完成后的通知机制,在实现时,我们为这个任务定义一个类型为 chan struct{} 类型的 done 变量,等任务结束后,我们就可以 close

个变量,然后,其它 receiver 就会收到这个通知。这是有一个任务的情况,如果有多个任务,只要有任意一个任务执行完,我们就想获得这个信号,这就是 Or-Done 模式。

func or(channels ...<-chan interface{}) <-chan interface{} { // 特殊情况,只有零个或者1个chan
  switch len(channels) {
  case 0:
    return nil
  case 1:
    return channels[0]
  }
  orDone := make(chan interface{})
  go func() {
    defer close(orDone)
    switch len(channels) {
    case 2: // 2个也是一种特殊情况
      select {
      case <-channels[0]:
      case <-channels[1]:
      }
    default: //超过两个,二分法递归处理
      m := len(channels) / 2
      select {
      case <-or(channels[:m]...):
      case <-or(channels[m:]...):
      }
    }
  }()
  return orDone
}
func sig(after time.Duration) <-chan interface{} {
  c := make(chan interface{})
  go func() {
    defer close(c)
    time.Sleep(after)
  }()
  return c
}
func main() {
  start := time.Now()
  <-or(sig(10*time.Second), sig(20*time.Second), sig(30*time.Second), sig(40*time.Second), sig(50*time.Second), sig(01*time.Minute))
  fmt.Printf("done after %v", time.Since(start))
}

这里的实现使用了一个巧妙的方式,当 chan 的数量大于 2 时,使用递归的方式等待信号。

扇入模式

扇入借鉴了数字电路的概念,它定义了单个逻辑门能够接受的数字信号输入最大量的术语。一个逻辑门可以有多个输入,一个输出。

在软件工程中,模块的扇入是指有多少个上级模块调用它。而对于我们这里的 Channel 扇入模式来说,就是指有多个源 Channel 输入、一个目的 Channel 输出的情况。扇入比就

是源 Channel 数量比 1。 每个源 Channel 的元素都会发送给目标 Channel,相当于目标 Channelreceiver 只需要监听目标 Channel,就可以接收所有发送给源 Channel 的数据。

func fanInReflect(chans ...<-chan interface{}) <-chan interface{} {
  out := make(chan interface{})
  go func() {
    defer close(out) // 构造SelectCase slice
    var cases []reflect.SelectCase
    for _, c := range chans {
      cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(c)})
    } // 循环,从cases中选择一个可用的
    for len(cases) > 0 {
      i, v, ok := reflect.Select(cases)
      if !ok { // 此channel已经close
        cases = append(cases[:i], cases[i+1:]...)
        continue
      }
      out <- v.Interface()
    }
  }()
  return out
}
扇出模式

有扇入模式,就有扇出模式,扇出模式是和扇入模式相反的。

扇出模式只有一个输入源 Channel,有多个目标 Channel,扇出比就是 1 比目标 Channel 数的值,经常用在设计模式中的观察者模式中(观察者设计模式定义了对象间的一种一

对多的组合关系。这样一来,一个对象的状态发生变化时,所有依赖于它的对象都会得到通知并自动刷新)。在观察者模式中,数据变动后,多个观察者都会收到这个变更信号。

下面是一个扇出模式的实现。从源 Channel 取出一个数据后,依次发送给目标 Channel

在发送给目标 Channel 的时候,可以同步发送,也可以异步发送:

func fanOut(ch <-chan interface{}, out []chan interface{}, async bool) {
  go func() {
    defer func() { //退出时关闭所有的输出chan 
      for i := 0; i < len(out); i++ {
        close(out[i])
      }
    }()
    for v := range ch { // 从输入chan中读取数据
      v := v
      for i := 0; i < len(out); i++ {
        i := i
        if async { //异步
          go func() {
            out[i] <- v // 放入到输出chan中,异步方式
          }()
        } else {
          out[i] <- v // 放入到输出chan中,同步方式
        }
      }
    }
  }()
}
Stream

这里我来介绍一种把 Channel 当作流式管道使用的方式,也就是把 Channel 看作流 (Stream),提供跳过几个元素,或者是只取其中的几个元素等方法

func asStream(done <-chan struct{}, values ...interface{}) <-chan interface{} {
    s := make(chan interface{}) //创建一个unbuffered的channel
    go func() {                 // 启动一个goroutine,往s中塞数据
        defer close(s)             // 退出时关闭chan
        for _, v := range values { // 遍历数组
            select {
                case <-done:
                    return
                case s <- v: // 将数组元素塞入到chan中
            }
        }
    }()
    return s
}

流创建好以后,该咋处理呢?下面我再给你介绍下实现流的方法。

  1. takeN:只取流中的前 n 个数据;
  2. takeFn:筛选流中的数据,只保留满足条件的数据;
  3. takeWhile:只取前面满足条件的数据,一旦不满足条件,就不再取;
  4. skipN:跳过流中前几个数据;
  5. skipFn:跳过满足条件的数据;
  6. skipWhile:跳过前面满足条件的数据,一旦不满足条件,当前这个元素和以后的元素都
    会输出给 Channelreceiver
func takeN(done <-chan struct{}, valueStream <-chan interface{}, num int) <-chan interface{} {
  takeStream := make(chan interface{}) // 创建输出流
  go func() {
    defer close(takeStream)
    for i := 0; i < num; i++ { // 只读取前num个元素
      select {
      case <-done:
        return
      case takeStream <- <-valueStream: //从输入流中读取元素
      }
    }
  }()
  return takeStream
}
Map-Reduce

单机单进程的 map-reduce 方法。map-reduce 分为两个步骤,第一步是映射(map),处理队列中的数据,第二步是规约 (reduce),把列表中的每一个元素按照一定的处理方式处理成结果,放入到结果队列中。

就像做汉堡一样,map 就是单独处理每一种食材,reduce 就是从每一份食材中取一部分,做成一个汉堡。

func mapChan(in <-chan interface{}, fn func(interface{}) interface{}) <-chan interface{} {
  out := make(chan interface{}) //创建一个输出chan
  if in == nil {                // 异常检查
    close(out)
    return out
  }
  go func() { // 启动一个goroutine,实现map的主要逻辑
    defer close(out)
    for v := range in { // 从输入chan读取数据,执行业务操作,也就是map操作
      out <- fn(v)
    }
  }()
  return out
}
func reduce(in <-chan interface{}, fn func(r, v interface{}) interface{}) interface{} {
  if in == nil { // 异常检查
    return nil
  }
  out := <-in         // 先读取第一个元素
  for v := range in { // 实现reduce的主要逻辑
    out = fn(out, v)
  }
  return out
}
// 生成一个数据流
func asStream(done <-chan struct{}) <-chan interface{} {
  s := make(chan interface{})
  values := []int{1, 2, 3, 4, 5}
  go func() {
    defer close(s)
    for _, v := range values { // 从数组生成
      select {
      case <-done:
        return
      case s <- v:
      }
    }
  }()
  return s
}
func main() {
  in := asStream(nil)                                              // map操作: 乘以10
  mapFn := func(v interface{}) interface{} { return v.(int) * 10 } // reduce操作: 对map的结果进行累加
  reduceFn := func(r, v interface{}) interface{} { return r.(int) + v.(int) }
  sum := reduce(mapChan(in, mapFn), reduceFn) //返回累加结果
  fmt.Println(sum)
}


目录
相关文章
|
17天前
|
存储 Go 索引
go语言中数组和切片
go语言中数组和切片
26 7
|
17天前
|
Go 开发工具
百炼-千问模型通过openai接口构建assistant 等 go语言
由于阿里百炼平台通义千问大模型没有完善的go语言兼容openapi示例,并且官方答复assistant是不兼容openapi sdk的。 实际使用中发现是能够支持的,所以自己写了一个demo test示例,给大家做一个参考。
|
17天前
|
程序员 Go
go语言中结构体(Struct)
go语言中结构体(Struct)
92 71
|
16天前
|
存储 Go 索引
go语言中的数组(Array)
go语言中的数组(Array)
100 67
|
17天前
|
存储 Go
go语言中映射
go语言中映射
32 11
|
19天前
|
Go
go语言for遍历映射(map)
go语言for遍历映射(map)
29 12
|
18天前
|
Go 索引
go语言使用索引遍历
go语言使用索引遍历
26 9
|
18天前
|
Go 索引
go语言使用range关键字
go语言使用range关键字
24 7
|
18天前
|
Go 索引
go语言修改元素
go语言修改元素
25 6
|
8天前
|
Go 数据安全/隐私保护 UED
优化Go语言中的网络连接:设置代理超时参数
优化Go语言中的网络连接:设置代理超时参数