[TOC]
1 goroutine
package main
import (
"fmt"
"time"
)
func spinner() {
for {
for _, c := range `\|/` {
fmt.Printf("\r%c", c)
time.Sleep(100 * time.Millisecond)
}
}
}
func fabi(n int) int {
if n < 2 {
return n
}
return fabi(n-1) + fabi(n-2)
}
func main() {
go spinner()
res := fabi(45)
fmt.Println(res)
}
goroutine 是golang并发编程的概念。主routine就是main函数所在的routine,当主routine退出时,所有的其他routine会自动被退出。除了主routine和程序退出之外,没有其他办法能让一个routine退出。不过后续可以通过channel的方式通知routine自动退出。
2 并发的clock
并发的clock分为server和client,其中server监听某个端口,当有client连接后,server会向client每个1s发送time消息。
// client.go
package main
import (
"fmt"
"io"
"log"
"net"
"os"
)
func main() {
conn, err := net.Dial("tcp", "localhost:8000")
if err != nil {
log.Fatal("Failed to Dial")
}
defer conn.Close()
mustCopy(os.Stdout, conn)
}
func mustCopy(w io.Writer, r io.Reader) {
// opy copies from src to dst until either EOF is reached on src or an error occurs
len, err := io.Copy(w, r)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%d bytes are received from the server", len)
}
// server.go
package main
import (
"flag"
"io"
"log"
"net"
"os"
"strconv"
"time"
)
func main() {
port := flag.Uint("port", 8000, "The port of the clock server")
flag.Parse()
listener, err := net.Listen("tcp", "localhost:"+strconv.Itoa(int(*port)))
if err != nil {
log.Fatal(err)
}
for {
conn, err := listener.Accept()
if err != nil {
log.Print(err)
continue
}
defer conn.Close()
go handleConn(conn)
}
}
func handleConn(c net.Conn) {
defer c.Close()
for {
timeStr := os.Getenv("TZ") + "--" + time.Now().Format("15:04:05\n")
_, err := io.WriteString(c, timeStr)
if err != nil {
return
}
time.Sleep(1 * time.Second)
}
}
//clockWall.go 见习题8.1
package main
import (
"fmt"
"io"
"log"
"net"
"os"
"strings"
"time"
)
func main() {
for _, cityAndServer := range os.Args[1:] {
tmp := strings.Split(cityAndServer, "=")
if len(tmp) != 2 {
fmt.Println("The input is incorrect!")
continue
}
city := tmp[0]
server := tmp[1]
fmt.Printf("city:%s -- server:%s\n", city, server)
go clientDial(city, server)
}
// 这儿sleep的原因是如果不sleep的话,main routine走到这就直接退出了,从而导致所有的子routine也一起退出
time.Sleep(100 * time.Minute)
}
func clientDial(city string, server string) {
conn, err := net.Dial("tcp", server)
if err != nil {
fmt.Printf("Failed tp Dial!err:%s", err.Error())
return
}
fmt.Println("Dial success")
defer conn.Close()
copyData(os.Stdout, conn)
}
func copyData(w io.Writer, r io.Reader) {
if _, err := io.Copy(w, r); err != nil {
log.Fatal(err)
}
}
3 Channel
Channel是routine间的通信机制。
chan := make(chan int) // the type of chan is chan int. 无缓冲的channel
chan := make(chan int, 3) // 带缓冲的channel
注意上面的chan的type是chan int
. 同map一样,chan的传递也是引用传递,即赋值和参数赋值都是传递的引用,也就是说所有的对象其实都指向同一个底层的对象。
channel的发送和接收都比较简单。
chan := make(chan int)
chan <- struct // 发送
i := <-chan // 接收
chan还支持close操作。不能在一个close的channel上进行发送操作,否则会产生异常。但是在接收端还可以接收已经发送成功的数据,当数据传输完成后,会得到1个零值。
带缓冲的channel
package main
import (
"fmt"
"io"
"log"
"net"
"os"
)
func main() {
conn, err := net.Dial("tcp", "localhost:8080")
if err != nil {
log.Fatalf("Failed to Dial!err:%s", err.Error())
}
defer conn.Close()
done := make(chan struct{}) // chan struct{} 是个类型,不是两个入参
go func() {
io.Copy(os.Stdout, conn)
fmt.Println("done")
done <- struct{}{}
}()
mustCopy(conn, os.Stdin)
conn.Close()
<-done
}
func mustCopy(dst io.Writer, src io.Reader) {
if _, err := io.Copy(dst, src); err != nil {
log.Fatal("Failed to copy")
}
}
如下实现了main groutine 和 子groutine间的通信,当子groutine结束时,需要通知main groutine也stop掉。
对于无缓冲的channel,在<-done
会阻塞住, 直到发送端发送消息。
串联的channel
想实现如上的功能,分为3部分:
- counter-生成数值
- squarer - 平方数值
- printer - 打印
所以会用到两个channel:natural and squares.
所以,见如下code实现(ver1)
//ver1
package main
import (
"fmt"
"time"
)
func main() {
naturals := make(chan uint)
squares := make(chan uint)
// squares
go func() {
x := <-naturals
fmt.Printf("Squares rcv: %d\n", x)
squares <- x * x
fmt.Printf("Squares send %d\n", x*x)
}()
// printer
go func() {
val := <-squares
fmt.Printf("Printer rcv: %d\n", val)
}()
i := uint(0)
for {
i++
naturals <- i
time.Sleep(1 * time.Second) // 每个1s产生1个数值
}
}
但是run之后,会报如下的error
fatal error: all goroutines are asleep - deadlock!
|
In https://golang.org/ref/spec#Program_execution, A complete program is created by linking a single, unimported package called the main package with all the packages it imports, transitively. The main package must have package name main and declare a function mainthat takes no arguments and returns no value.
func main() { … }
Program execution begins by initializing the main package and then invoking the function main. When that function invocation returns, the program exits. It does not wait for other (non-main) goroutines to complete.
上述错误的原因是在子 goroutine中没有for语句,run一遍之后就自动推出了。所以,当执行完第一遍之后,就只剩下main groutine了。此时naturals <-i
没有接收方了,所以就肯定会阻塞这里,即dead lock.
关于channel发送和接收的顺序,其实两者都是阻塞住的,也就是接收端会一直阻塞住直到收到一个消息,发送方会一直阻塞住直到消息被接收端接收。对于无缓存channel,发送成功和接收成功是同时发生的。比如
package main
import (
"log"
"time"
)
func main() {
ch := make(chan int)
go func() {
for {
val := <-ch
log.Printf("Rcv:%d\n", val)
time.Sleep(2 * time.Second)
}
}()
i := 0
for {
i++
ch <- i
log.Printf("Send %d\n", i)
time.Sleep(1 * time.Second)
}
}
从输出可以看出,尽管发生和接受的sleep时间不一致,但是在输出的时刻是一致的。
2019/03/09 22:35:56 Rcv:1
2019/03/09 22:35:56 Send 1
2019/03/09 22:35:58 Rcv:2
2019/03/09 22:35:58 Send 2
2019/03/09 22:36:00 Rcv:3
2019/03/09 22:36:00 Send 3
上述是发送无尽个数值,假如想发送有限个数列呢 ?其实问题就在于怎么让后面的Squarer,Printer知道数列已经发送完成了?可以通过close(naturals)
来实现
//ver1
package main
import (
"fmt"
"time"
)
func main() {
naturals := make(chan uint, 10)
squares := make(chan uint, 10)
//naturals
go func() {
for i := uint(0); i < 10; i++ {
naturals <- i
time.Sleep(1 * time.Second)
}
close(naturals)
}()
// squares
go func() {
for {
x := <-naturals
fmt.Printf("Squares rcv: %d\n", x)
squares <- x * x
fmt.Printf("Squares send %d\n", x*x)
}
}()
// printer
for {
val := <-squares
fmt.Printf("Printer rcv: %d\n", val)
}
}
当1个channel被close之后,在通过这个channel发送会产生一个pannic, 同时接收者也不会阻塞,它会立刻返回1个0值 。所以,上述的Suqares and Printer会一直打印0值。
Printer rcv: 0
Printer rcv: 0
Printer rcv: 0
Printer rcv: 0
Squares send 0
Squares rcv: 0
Squares send 0
Squares rcv: 0
那么有没办法让接收端知道channel是否已经被close ? 有!
//ver1
package main
import (
"fmt"
"time"
)
func main() {
naturals := make(chan uint, 10)
squares := make(chan uint, 10)
//naturals
go func() {
for i := uint(0); i < 5; i++ {
naturals <- i
time.Sleep(1 * time.Second)
}
close(naturals)
}()
// squares
go func() {
for {
x, ok := <-naturals
if !ok {
fmt.Println("naturals has been closed")
break
}
fmt.Printf("Squares rcv: %d\n", x)
squares <- x * x
fmt.Printf("Squares send %d\n", x*x)
}
}()
// printer
for {
val, ok := <-squares
if !ok {
fmt.Println("the squares has been closed!")
break
}
fmt.Printf("Printer rcv: %d\n", val)
}
}
在接收端我们可以通过增加一个参数ok, 来判断channel是否被close。true表示接收值成功,false表示channel已经被close,并且没有值可以读取。
但是在如上的code,还是会输出
fatal error: all goroutines are asleep - deadlock!
原因是在line27,break会退出子groutine,但是channel square并没有被close。此时在line38 依然会等待读取,但此时squares已经没有send了。所以就会形成dead lock
修改方式是:
//ver1
package main
import (
"fmt"
"time"
)
func main() {
naturals := make(chan uint, 10)
squares := make(chan uint, 10)
//naturals
go func() {
for i := uint(0); i < 5; i++ {
naturals <- i
time.Sleep(1 * time.Second)
}
close(naturals)
}()
// squares
go func() {
// 在此处添加一个defer close
defer close(squares)
for {
x, ok := <-naturals
if !ok {
fmt.Println("naturals has been closed")
break
}
fmt.Printf("Squares rcv: %d\n", x)
squares <- x * x
fmt.Printf("Squares send %d\n", x*x)
}
}()
// printer
for {
val, ok := <-squares
if !ok {
fmt.Println("the squares has been closed!")
break
}
fmt.Printf("Printer rcv: %d\n", val)
}
}
更优雅的用法是range,range依次从channel中读取数据,当channel被关闭且没有数据时跳出。
//ver1
package main
import (
"fmt"
"time"
)
func main() {
naturals := make(chan uint, 10)
squares := make(chan uint, 10)
//naturals
go func() {
defer close(naturals)
for i := uint(0); i < 10; i++ {
naturals <- i
time.Sleep(1 * time.Second)
}
}()
// squares
go func() {
// 在此处添加一个defer close
defer close(squares)
for x := range naturals {
fmt.Printf("squares rcv: %d\n", x)
squares <- x * x
}
fmt.Println("naturals has been closed")
}()
// printer
for x := range squares {
fmt.Printf("printer rcv: %d\n", x)
}
fmt.Println("squares has been closed")
}
其实你并不需要关闭每一个channel。只要当需要告诉接收者goroutine,所有的数据已经全部发送时才需要关闭channel。不管一个channel是否被关闭,当它没有被引用时将会被Go语言
的垃圾自动回收器回收。(不要将关闭一个打开文件的操作和关闭一个channel操作混淆。对于每个打开的文件,都需要在不使用的使用调用对应的Close方法来关闭文件。)
单方向的channel
单方向的channel也就是只能发送或只能接收的channel。比如说,上文的counter的channel只需要发送功能,printer只负责接收功能,为了code的安全性,所以有单方向的channel的设计。
package main
import (
"fmt"
"time"
)
func counter(out chan<-int) {
defer close(out)
for i:= 0; i < 10; i++ {
out <-i
time.Sleep(1*time.Second)
}
fmt.Println("counter is closed")
}
func square(in <-chan int, out chan <- int) {
defer close(out)
for x := range in {
out <- x * x
}
fmt.Println("square is closed")
}
func printer(in <-chan int) {
for x := range in {
fmt.Printf("Rcv: %d\n", x)
}
}
func main() {
naturals := make(chan int)
squares := make(chan int)
go counter(naturals) // 将chan int隐式转换为<-chan int(send)
go square(naturals, squares) // 将chan int隐式转换为chan<-int(rcv), <-chan int(send)
printer(squares)
}
带缓冲的channel
带缓冲的channel实际上是将发送和接收解耦了,可以在无人接收的同时进行发生。效率高一些。
无缓冲的channel在同步性上更高一点。
package main
import (
"fmt"
"time"
)
func main() {
done := make(chan int, 3)
go func() {
time.Sleep(910 * time.Microsecond)
done <- 3
}()
go func() {
time.Sleep(890 * time.Microsecond)
done <- 1
}()
go func() {
time.Sleep(900 * time.Microsecond)
done <- 2
}()
fmt.Println(<-done)
}
输出为1.
4 并发的循环
现在想调用gopl.io的thumbnail包来处理图片,gopl的相关定义是:
package thumbnail
// ImageFile reads an image from infile and writes
// a thumbnail-size version of it in the same directory.
// It returns the generated file name, e.g. "foo.thumb.jpeg".
func ImageFile(infile string) (string, error) {
ext := filepath.Ext(infile) // e.g., ".jpg", ".JPEG"
outfile := strings.TrimSuffix(infile, ext) + ".thumb" + ext
return outfile, ImageFile2(outfile, infile)
}
- 首先能想到的就是串行的处理方式
func makeThumbnails(filenames []string) {
for _, f := range filenames {
thumbnail.ImageFile(f)
}
}
- 但是考虑到效率,现在使用并行的方式来处理(incorrect code)
func makeThumbnails1(filenames []string) {
for _, filename := range filenames {
go thumbnail.ImageFile(filename)
}
}
上述的实现是不正确的,因为在makeThumbnails
执行完之后,main goroutine就自动退出了, 不会等待worker goroutine执行完。
- 那么怎么让main groutine知道worker goroutine执行完了呢?方法就是channel
func makeThumbnails3(filenames []string) {
done := make(chan struct{})
for _, f :=range filenames {
go func() {
log.Printf("Start process image %s\n", f)
_, err := thumbnail.ImageFile(f)
if err != nil {
log.Println(err)
}
done <- struct{}{}
}()
}
for range filenames {
<-done
}
}
run 之后,发现输出有问题
PS D:\07-go\src\gopl> .\main.exe .\1.jpg .\2.jpg .\3.jpg
2019/03/11 21:58:39 Start process image .\2.jpg
2019/03/11 21:58:39 Start process image .\3.jpg
2019/03/11 21:58:39 Start process image .\3.jpg
执行了2次3.jpg, 为什么呢?其实在前面讲过,函数字面量会保存f的值。也就是如下的逻辑:
1) goroutine 1: 执行到log.Printf, 此时f=1.jpg, 还为进行ImageFile.
2)goroutine 2: 此时f变为2.jpg, 当goroutine1 执行ImageFile时,就变成了执行2.jpg的变换。
3) 其实这也就是goroutine间的共享变量的问题。
- 应该采用显式传入filename的方式
func makeThumbnails3(filenames []string) {
done := make(chan struct{})
for _, f :=range filenames {
go func(infile string) {
log.Printf("Start process image %s\n", infile)
_, err := thumbnail.ImageFile(infile)
if err != nil {
log.Println(err)
}
done <- struct{}{}
}(f)
}
for range filenames {
<-done
}
}
- 假如想实现遇到第一个error之后,直接退出并打印error的功能呢?
func makeThumbnails4(filenames []string) error{
errCh := make(chan error)
for _, f := range filenames {
go func(infile string) {
log.Println("Start processing %s", infile)
_, err := thumbnail.ImageFile(infile)
errCh <- err
}(f)
}
for range filenames {
if err := <- errCh; err != nil {
log.Println(err)
return err
}
}
return nil
}
但是这里有个bug,当make函数遇到第1个error退出后,此时对于channel errCh就没有接收者了。但是对于其他仍在run的worker goroutine会block在发送端,从而导致goroutine泄漏。
可能会导致整个程序的卡住或者out of memory的错误
- 解决办法就是采用带缓冲的channel。 我们在前面也说过,带缓冲的channel可以去掉channel发送和接收端的耦合。
func makeThumbnails4(filenames []string) error{
errCh := make(chan error, len(filenames))
for _, f := range filenames {
go func(infile string) {
log.Println("Start processing %s", infile)
_, err := thumbnail.ImageFile(infile)
errCh <- err
}(f)
}
for range filenames {
if err := <- errCh; err != nil {
log.Println(err)
return err
}
}
return nil
}
- 假如我们想实现多个goroutine,但是有不知道goroutine的个数,应该怎么管理goroutine呢?又怎么能知道start了多少个goroutine,然后又close了多少goroutine呢?
解决办法就是Sync.WaitGroup
package main
import (
"fmt"
"gopl.io/ch8/thumbnail"
"log"
"os"
"sync"
)
func makeThumbnails(filenames []string) {
for _, f := range filenames {
thumbnail.ImageFile(f)
}
}
func makeThumbnails1(filenames []string) {
for _, filename := range filenames {
go thumbnail.ImageFile(filename)
}
}
func makeThumbnails3(filenames []string) {
done := make(chan struct{})
for _, f :=range filenames {
go func(infile string) {
log.Printf("Start process image %s\n", infile)
_, err := thumbnail.ImageFile(infile)
if err != nil {
log.Println(err)
}
done <- struct{}{}
}(f)
}
for range filenames {
<-done
}
}
func makeThumbnails4(filenames []string) error{
errCh := make(chan error, len(filenames))
for _, f := range filenames {
go func(infile string) {
log.Println("Start processing %s", infile)
_, err := thumbnail.ImageFile(infile)
errCh <- err
}(f)
}
for range filenames {
if err := <- errCh; err != nil {
log.Println(err)
return err
}
}
return nil
}
func makeThumbnails6(filenames []string) int64 {
ch := make(chan int64)
var wg sync.WaitGroup
for _, f := range filenames {
wg.Add(1)
go func(infile string) {
defer wg.Done()
f, _ := thumbnail.ImageFile(infile)
info, _ := os.Stat(f)
ch <- info.Size()
}(f)
}
go func() {
wg.Wait()
close(ch)
}()
var total int64
for size := range ch {
total += size
}
return total
}
func main() {
filenames := os.Args[1:]
sizes := makeThumbnails6(filenames)
fmt.Printf("The sizes of files is %d\n", sizes)
}
如上实现了统计变换后图片的大小。
那么现在就有个问题,为什么要把wg.Wait
放到goroutine里?
假如不放到goroutine中,就需要考虑放在for total
的前面还是后面?
假如放在循环之前,则程序永远都不会结束。因为对于ch走不到接收的地方,则send端会一直block住,从而形成dead lock.
如果放在之后,则也会形成dead lock, 因为worker goroutine发送完之后就退出了,就永远没有send了,则ch则会一直block在接收端。
因此,就需要将close操作也放到一个额外的goroutine中。
上面的程序代码结构, 是当我们使用并发循环,但又不知道迭代次数时很通常而且很地道的写法。
基于select的多路复用
package main
import (
"fmt"
"os"
"time"
)
func launch() {
fmt.Println("LAUNCH")
}
func main() {
abort := make(chan struct{})
go func() {
os.Stdin.Read(make([]byte, 1))
abort <- struct{}{}
}()
tick := time.Tick(1*time.Second)
for countdown := 10; countdown > 0; countdown-- {
fmt.Println(countdown)
select {
case <-abort:
fmt.Println("Rcv abort signal")
return
case <-tick:
fmt.Println("Time out")
}
}
launch()
}
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan int, 1) // 注意这里的size是1
for i := 0; i < 10; i++ {
select {
case ch <- i:
fmt.Printf("Send %d\n", i)
case x := <-ch:
fmt.Printf("Rcv %d\n", x)
}
time.Sleep(1*time.Second)
}
}
当channel size是1,输出永远是偶数,原因自己考虑吧。
那么当channel >1时,结果就不确定了,因为在select的多个case都满足时,就会自动选择其中1个。