一、使用channel创建消息生产者、消费者
import ( "fmt" "sync" "testing" ) //生产者 func dataProducer(ch chan int, wg *sync.WaitGroup) { go func() { for i := 0; i < 10; i++ { //给通道添加数据 ch <- i } //关闭channel,向关闭的channel发送数据,会导致panic close(ch) //ch <- -1 //panic: send on closed channel wg.Done() }() } //消费者 func dataReceiver(ch chan int, wg *sync.WaitGroup) { go func() { for { //从通道取数据,ok为bool值,true表示正常接收,false表示通道关闭 //关闭通道,接收数据,返回零值 if data, ok := <-ch; ok { fmt.Println(data) } else { break } } wg.Done() }() } func TestCloseChannel(t *testing.T) { var wg sync.WaitGroup //创建通道 ch := make(chan int) wg.Add(1) //生成数据 dataProducer(ch, &wg) wg.Add(1) //消费数据 dataReceiver(ch, &wg) wg.Add(1) //消费数据 dataReceiver(ch, &wg) wg.Wait() }
=== RUN TestCloseChannel 0 2 3 4 5 6 7 8 9 1 --- PASS: TestCloseChannel (0.00s) PASS
二、使用channel关闭任务
import ( "fmt" "testing" "time" ) //任务是否已被取消 //实现原理: //检查是否从 channel 收到一个消息,如果收到一个消息,我们就返回 true,代表任务已经被取消了 //当没有收到消息,channel 会被阻塞,多路选择机制就会走到 default 分支上去。 func isCanlled(cancelChan chan struct{}) bool { select { //是否收到消息 case <-cancelChan: return true default: return false } } func cancel_1(cancelChan chan struct{}) { //struct{}空结构 struct{}{}实例化空结构 cancelChan <- struct{}{} } //执行任务取消 //因为 close() 是一个广播机制,所以所有的协程都会收到消息 func cancel_2(cancel chan struct{}) { // close(cancel)会使所有处于处于阻塞等待状态的消息接收者(<-cancelChan)收到消息 close(cancel) } func TestCancel(t *testing.T) { cancelChan := make(chan struct{}, 0) for i := 0; i < 5; i++ { go func(i int, cancelCh chan struct{}) { for { if isCanlled(cancelCh) { break } time.Sleep(time.Millisecond * 5) } fmt.Println(i, "Canceled") }(i, cancelChan) } //只有一个被取消 //cancel_1(cancelChan) //全部被取消 cancel_2(cancelChan) time.Sleep(time.Second * 1) }
=== RUN TestCancel 2 Canceled 1 Canceled 3 Canceled 4 Canceled 0 Canceled --- PASS: TestCancel (1.01s) PASS
三、使用Context取消关闭任务
import ( "context" "fmt" "testing" "time" ) //context就是用于管理相关任务的上下文,包含了共享值的传递,超时,取消通知 func isCanlled(ctx context.Context) bool { select { //Done方法返回一个信道(channel),当Context被撤销或过期时,该信道是关闭的,即它是一个表示Context是否已关闭的信号。 //当Done信道关闭后,Err方法表明Context被撤的原因。 case <-ctx.Done(): return true default: return false } } func TestCancel(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) for i := 0; i < 5; i++ { go func(i int, ctx context.Context) { for { if isCanlled(ctx) { break } time.Sleep(time.Millisecond * 5) } fmt.Println(i, "Canceled") }(i, ctx) } cancel() time.Sleep(time.Second * 1) }
=== RUN TestCancel 4 Canceled 0 Canceled 1 Canceled 2 Canceled 3 Canceled --- PASS: TestCancel (1.02s) PASS