Go语言TCP Socket编程(上):https://developer.aliyun.com/article/1490865
conn.Write
和读相比,Write遇到的情形一样不少,我们也逐一看一下。
1、成功写
前面例子着重于Read,client端在Write时并未判断Write的返回值。所谓“成功写”指的就是Write调用返回的n与预期要写入的数据长度相等,且error == nil
。这是我们在调用Write时遇到的最常见的情形,这里不再举例了。
2、写阻塞
TCP连接通信两端的OS都会为该连接保留数据缓冲,一端调用Write后,实际上数据是写入到OS的协议栈的数据缓冲的。TCP是全双工通信,因此每个方向都有独立的数据缓冲。当发送方将对方的接收缓冲区以及自身的发送缓冲区写满后,Write就会阻塞。我们来看一个例子:client5.go和server.go。
:::details
package main import ( "log" "net" "time" ) func handleConn(c net.Conn) { defer c.Close() time.Sleep(time.Second * 10) for { // read from the connection time.Sleep(5 * time.Second) var buf = make([]byte, 60000) log.Println("start to read from conn") n, err := c.Read(buf) if err != nil { log.Printf("conn read %d bytes, error: %s", n, err) if nerr, ok := err.(net.Error); ok && nerr.Timeout() { continue } } log.Printf("read %d bytes, content is %s\n", n, string(buf[:n])) } } func main() { listen, err := net.Listen("tcp", ":8001") if err != nil { log.Println("listen occurs an error: ", err) return } for { con, err := listen.Accept() if err != nil { log.Println("accept occurs error: ", err) continue } handleConn(con) } }
package main import ( "log" "net" "time" ) func main() { log.Println("begin dial...") conn, err := net.Dial("tcp", ":8001") if err != nil { log.Println("dial error:", err) return } defer conn.Close() log.Println("dial ok") data := make([]byte, 65536) var total int for { n, err := conn.Write(data) if err != nil { total += n log.Printf("write %d bytes, error:%s\n", n, err) break } total += n log.Printf("write %d bytes this time, %d bytes in total\n", n, total) } log.Printf("write %d bytes in total\n", total) time.Sleep(time.Second * 10000) }
结果
> go run .\server5.go 2022/04/20 22:46:13 start to read from conn 2022/04/20 22:46:13 read 60000 bytes, content is 2022/04/20 22:46:18 start to read from conn 2022/04/20 22:46:18 read 60000 bytes, content is 2022/04/20 22:46:23 start to read from conn 2022/04/20 22:46:23 read 60000 bytes, content is 2022/04/20 22:46:28 start to read from conn 2022/04/20 22:46:28 read 60000 bytes, content is 2022/04/20 22:46:33 start to read from conn 2022/04/20 22:46:33 read 60000 bytes, content is 2022/04/20 22:46:38 start to read from conn 2022/04/20 22:46:38 read 60000 bytes, content is 2022/04/20 22:46:43 start to read from conn 2022/04/20 22:46:43 read 60000 bytes, content is 2022/04/20 22:46:48 start to read from conn 2022/04/20 22:46:48 read 60000 bytes, content is exit status 0xc000013a
> go run .\client5.go 2022/04/20 22:45:58 begin dial... 2022/04/20 22:45:58 dial ok 2022/04/20 22:45:58 write 65536 bytes this time, 65536 bytes in total ... 2022/04/20 22:46:18 write 65536 bytes this time, 4390912 bytes in total 2022/04/20 22:46:18 write 65536 bytes this time, 4456448 bytes in total 2022/04/20 22:46:18 write 65536 bytes this time, 4521984 bytes in total 2022/04/20 22:46:18 write 65536 bytes this time, 4587520 bytes in total 2022/04/20 22:46:18 write 65536 bytes this time, 4653056 bytes in total 2022/04/20 22:46:50 write 0 bytes, error:write tcp 127.0.0.1:49307->127.0.0.1:8001: wsasend: An existing connection was forcibly closed by the remote host. 2022/04/20 22:46:50 write 4653056 bytes in total exit status 0xc000013a
Server5在前10s中并不Read数据,因此当client5一直尝试写入时,写到一定量后就会发生阻塞
:::
在Darwin上,这个size大约在679468bytes。后续当server5每隔5s进行Read时,OS socket缓冲区腾出了空间,client5就又可以写入了:
3、写入部分数据
Write操作存在写入部分数据的情况,比如上面例子中,当client端输出日志停留在“write 65536 bytes this time, 655360 bytes in total”时,我们杀掉server5,这时我们会看到client5输出以下日志:
... > go run .\client5.go 2022/04/20 22:55:03 begin dial... 2022/04/20 22:55:03 dial ok ... 2022/04/20 22:55:03 write 65536 bytes this time, 2293760 bytes in total 2022/04/20 22:55:06 write 0 bytes, error:write tcp 127.0.0.1:50077->127.0.0.1:8001: wsasend: An existing connection was forcibly closed by the remote host. 2022/04/20 22:55:06 write 2293760 bytes in total
测试了很多次,并没有出现以下结果,可能是因为版本的问题。
显然Write并非在65536这个地方阻塞的,而是后续又写入很多数据后发生了阻塞,server端socket关闭后,我们看到Wrote返回er != nil且n = 24108,程序需要对这部分写入的24108字节做特定处理。
4、写入超时
如果非要给Write增加一个期限,那我们可以调用SetWriteDeadline方法。
:::details
package main import ( "log" "net" "time" ) func handleConn(c net.Conn) { defer c.Close() time.Sleep(time.Second * 10) for { // read from the connection time.Sleep(5 * time.Second) var buf = make([]byte, 60000) log.Println("start to read from conn") n, err := c.Read(buf) if err != nil { log.Printf("conn read %d bytes, error: %s", n, err) if nerr, ok := err.(net.Error); ok && nerr.Timeout() { continue } } log.Printf("read %d bytes, content is %s\n", n, string(buf[:n])) } } func main() { listen, err := net.Listen("tcp", ":8001") if err != nil { log.Println("listen occurs an error: ", err) return } for { con, err := listen.Accept() if err != nil { log.Println("accept occurs error: ", err) continue } handleConn(con) } }
package main import ( "log" "net" "time" ) func main() { log.Println("begin dial...") conn, err := net.Dial("tcp", ":8001") if err != nil { log.Println("dial error:", err) return } defer conn.Close() log.Println("dial ok") data := make([]byte, 65536) var total int for { // 设置写超时 conn.SetWriteDeadline( time.Now().Add(10 * time.Microsecond)) n, err := conn.Write(data) if err != nil { total += n log.Printf("write %d bytes, error:%s\n", n, err) break } total += n log.Printf("write %d bytes this time, %d bytes in total\n", n, total) time.Sleep(100 * time.Millisecond) } log.Printf("write %d bytes in total\n", total) }
:::
启动server6.go,启动client6.go,我们可以看到写入超时的情况下,Write的返回结果:
:::details
> go run .\client6.go 2022/04/20 23:02:50 begin dial... 2022/04/20 23:02:50 dial ok 2022/04/20 23:02:50 write 65536 bytes this time, 65536 bytes in total 2022/04/20 23:02:50 write 65536 bytes this time, 131072 bytes in total 2022/04/20 23:02:50 write 65536 bytes this time, 196608 bytes in total ... 2022/04/20 23:02:53 write 65536 bytes this time, 2228224 bytes in total 2022/04/20 23:02:53 write 65536 bytes this time, 2293760 bytes in total 2022/04/20 23:02:54 write 0 bytes, error:write tcp 127.0.0.1:50553->127.0.0.1:8001: i/o timeout 2022/04/20 23:02:54 write 2293760 bytes in total
> go run .\server5.go 2022/04/20 23:03:05 start to read from conn 2022/04/20 23:03:05 read 60000 bytes, content is 2022/04/20 23:03:10 start to read from conn 2022/04/20 23:03:10 read 60000 bytes, content is ... 2022/04/20 23:05:05 start to read from conn 2022/04/20 23:05:05 read 60000 bytes, content is 2022/04/20 23:05:10 start to read from conn 2022/04/20 23:05:10 conn read 0 bytes, error: read tcp 127.0.0.1:8001->127.0.0.1:50553: wsarecv: An existing connection was forcibly closed by the remote host. 2022/04/20 23:05:10 read 0 bytes, content is 2022/04/20 23:05:15 start to read from conn ... 2022/04/20 23:06:25 start to read from conn 2022/04/20 23:06:25 conn read 0 bytes, error: read tcp 127.0.0.1:8001->127.0.0.1:50553: wsarecv: An existing connection was forcibly closed by the remote host. 2022/04/20 23:06:25 read 0 bytes, content is exit status 0xc000013a
可以看到在写入超时时,依旧存在部分数据写入的情况。
:::
综上例子,虽然Go
给我们提供了阻塞I/O
的便利,但在调用 Read
和 Write
时依旧要综合需要方法返回的n
和err
的结果,以做出正确处理。net.conn
实现了io.Reader
和io.Writer
接口,因此可以试用一些wrapper包进行socket
读写,比如bufio
包下面的Writer
和Reader
、io/ioutil
下的函数等。
Goroutine safe
基于goroutine的网络架构模型,存在在不同goroutine间共享conn的情况,那么conn的读写是否是goroutine safe
的呢?在深入这个问题之前,我们先从应用意义上来看read操作和write操作的goroutine-safe必要性。
对于read操作而言,由于 TCP 是面向字节流,conn.Read 无法正确区分数据的业务边界,因此多个goroutine对同一个conn进行read的意义不大,goroutine读到不完整的业务包反倒是增加了业务处理的难度。对与 Write 操作而言,倒是有多个goroutine并发写的情况。
不过conn读写是否goroutine-safe的测试不是很好做,我们先深入一下runtime代码,先从理论上给这个问题定个性:
:::tip
源码位置
go/net.go at master · golang/go (github.com)
:::
:::details
net.conn
只是*netFD
的wrapper结构,最终Write和Read都会落在其中的fd上:
type conn struct { fd *netFD }
netFD在不同平台上有着不同的实现,我们以go/fd_plan9.go at master · golang/go (github.com)中的netFD为例:
// Network file descriptor. type netFD struct { pfd poll.FD // immutable until Close net string n string dir string listen, ctl, data *os.File laddr, raddr Addr isStream bool } ... func (fd *netFD) ok() bool { return fd != nil && fd.ctl != nil } func (fd *netFD) destroy() { if !fd.ok() { return } err := fd.ctl.Close() if fd.data != nil { if err1 := fd.data.Close(); err1 != nil && err == nil { err = err1 } } if fd.listen != nil { if err1 := fd.listen.Close(); err1 != nil && err == nil { err = err1 } } fd.ctl = nil fd.data = nil fd.listen = nil } func (fd *netFD) Read(b []byte) (n int, err error) { if !fd.ok() || fd.data == nil { return 0, syscall.EINVAL } n, err = fd.pfd.Read(fd.data.Read, b) if fd.net == "udp" && err == io.EOF { n = 0 err = nil } return } func (fd *netFD) Write(b []byte) (n int, err error) { if !fd.ok() || fd.data == nil { return 0, syscall.EINVAL } return fd.pfd.Write(fd.data.Write, b) } ...
// FD is a file descriptor. The net and os packages use this type as a // field of a larger type representing a network connection or OS file. type FD struct { // Lock sysfd and serialize access to Read and Write methods. fdmu fdMutex // System file descriptor. Immutable until Close. Sysfd int // I/O poller. pd pollDesc // Writev cache. iovecs *[]syscall.Iovec // Semaphore signaled when file is closed. csema uint32 // Non-zero if this file has been set to blocking mode. isBlocking uint32 // Whether this is a streaming descriptor, as opposed to a // packet-based descriptor like a UDP socket. Immutable. IsStream bool // Whether a zero byte read indicates EOF. This is false for a // message based socket connection. ZeroReadIsEOF bool // Whether this is a file rather than a network socket. isFile bool }
我们看到poll.FD
中包含了fdMutex
类型字段,从注释上来看,该fdMutex
用来串行化对该netFD
对应的sysfd
的Write
和Read
操作。从这个注释上来看,所有对conn
的Read
和Write
操作都是有fdMutex
互斥的,从netFD的Read和Write方法的实现也证实了这一点:
// Read implements io.Reader. func (fd *FD) Read(p []byte) (int, error) { if err := fd.readLock(); err != nil { return 0, err } defer fd.readUnlock() if len(p) == 0 { // If the caller wanted a zero byte read, return immediately // without trying (but after acquiring the readLock). // Otherwise syscall.Read returns 0, nil which looks like // io.EOF. // TODO(bradfitz): make it wait for readability? (Issue 15735) return 0, nil } if err := fd.pd.prepareRead(fd.isFile); err != nil { return 0, err } if fd.IsStream && len(p) > maxRW { p = p[:maxRW] } for { n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p) if err != nil { n = 0 if err == syscall.EAGAIN && fd.pd.pollable() { if err = fd.pd.waitRead(fd.isFile); err == nil { continue } } } err = fd.eofError(n, err) return n, err } }
write
go/fd_unix.go at master · golang/go (github.com)
// Write implements io.Writer. func (fd *FD) Write(p []byte) (int, error) { if err := fd.writeLock(); err != nil { return 0, err } defer fd.writeUnlock() if err := fd.pd.prepareWrite(fd.isFile); err != nil { return 0, err } var nn int for { max := len(p) if fd.IsStream && max-nn > maxRW { max = nn + maxRW } n, err := ignoringEINTRIO(syscall.Write, fd.Sysfd, p[nn:max]) if n > 0 { nn += n } if nn == len(p) { return nn, err } if err == syscall.EAGAIN && fd.pd.pollable() { if err = fd.pd.waitWrite(fd.isFile); err == nil { continue } } if err != nil { return nn, err } if n == 0 { return nn, io.ErrUnexpectedEOF } } }
:::
每次Write
操作都是受lock
保护,直到此次数据全部write
完。因此在应用层面,要想保证多个Goroutine
在一个conn
上write
操作的Safe
,需要一次write
完整写入一个“业务包”;一旦将业务包的写入拆分为多次write
,
那就无法保证某个Goroutine
的某“业务包”数据在conn
发送的连续性。
同时也可以看出即便是Read
操作,也是lock
保护的。多个Goroutine
对同一conn
的并发读不会出现读出内容重叠的情况,但内容断点是依 runtime
调度来随机确定的。
存在一个业务包数据,1/3内容被goroutine-1
读走,另外2/3被另外一个goroutine-2
读 走的情况。比如一个完整包:world
,当goroutine的read slice size < 5
时,
存在可能:一个goroutine
读到 “worl
”,另外一个goroutine
读出”d
”。
四、Socket属性
原生Socket API
提供了丰富的socket
设置接口,但Golang
有自己的网络架构模型,golang
提供的socket options
接口也是基于上述模型的必要的属性设置。包括
- SetKeepAlive
- SetKeepAlivePeriod
- SetLinger
- SetNoDelay (默认no delay)
- SetWriteBuffer
- SetReadBuffer
比如posix 的socket option
,go/sockopt_posix.go at master · golang/go (github.com)
不过上面的Method是TCPConn的,而不是Conn的,要使用上面的Method的,需要type assertion:
tcpConn, ok := c.(*TCPConn) if !ok { //error handle } tcpConn.SetNoDelay(true)
对于listener socket, golang默认采用了 SO_REUSEADDR,这样当你重启 listener程序时,不会因为address in use的错误而启动失败。而listen backlog
的默认值是通过获取系统的设置值得到的。不同系统不同:mac 128, linux 512等。
比如linux的backlog
:::details
// Linux stores the backlog as: // // - uint16 in kernel version < 4.1, // - uint32 in kernel version >= 4.1 // // Truncate number to avoid wrapping. // // See issue 5030 and 41470. func maxAckBacklog(n int) int { major, minor := kernelVersion() size := 16 if major > 4 || (major == 4 && minor >= 1) { size = 32 } var max uint = 1<<size - 1 if uint(n) > max { n = int(max) } return n } func maxListenerBacklog() int { fd, err := open("/proc/sys/net/core/somaxconn") if err != nil { return syscall.SOMAXCONN } defer fd.close() l, ok := fd.readLine() if !ok { return syscall.SOMAXCONN } f := getFields(l) n, _, ok := dtoi(f[0]) if n == 0 || !ok { return syscall.SOMAXCONN } if n > 1<<16-1 { return maxAckBacklog(n) } return n }
:::
至于backlog
是啥,可以参考这篇文章,搜索了挺久才找到的。
使用Go和C实例来探究Linux TCP之listen backlog_Tw!light的博客-CSDN博客
简单理解了一下,博客中提到 backlog
是“操作系统层面的套接字队列长度”,应该就是可以接受的最大连接数吧(但是还没去验证)。
五、关闭连接
和前面的方法相比,关闭连接算是最简单的操作了。由于socket
是全双工的,client
和server
端在己方已关闭的socket
和对方关闭的socket
上操作的结果有不同。看下面例子:
:::details
package main import ( "log" "net" ) func handleConn(c net.Conn) { defer c.Close() // read from the connection var buf = make([]byte, 10) log.Println("start to read from conn") n, err := c.Read(buf) if err != nil { log.Println("conn read error:", err) } else { log.Printf("read %d bytes, content is %s\n", n, string(buf[:n])) } n, err = c.Write(buf) if err != nil { log.Println("conn write error:", err) } else { log.Printf("write %d bytes, content is %s\n", n, string(buf[:n])) } } func main() { listen, err := net.Listen("tcp", ":8888") if err != nil { return } log.Println("start to listen") for { con, err := listen.Accept() if err != nil { return } log.Println("a new connection accept") handleConn(con) } }
package main import ( "log" "net" "time" ) func main() { log.Println("begin dial...") conn, err := net.Dial("tcp", ":8888") if err != nil { log.Println("dial error:", err) return } conn.Close() log.Println("close ok") var buf = make([]byte, 32) n, err := conn.Read(buf) if err != nil { log.Println("read error:", err) } else { log.Printf("read % bytes, content is %s\n", n, string(buf[:n])) } n, err = conn.Write(buf) if err != nil { log.Println("write error:", err) } else { log.Printf("write % bytes, content is %s\n", n, string(buf[:n])) } time.Sleep(time.Second * 1000) }
上述例子的执行结果如下:
> go run .\server.go 2022/04/21 00:29:04 start to listen 2022/04/21 00:29:13 a new connection accept 2022/04/21 00:29:13 start to read from conn 2022/04/21 00:29:13 conn read error: EOF 2022/04/21 00:29:13 write 10 bytes, content is 2022/04/21 00:29:37 a new connection accept 2022/04/21 00:29:37 start to read from conn 2022/04/21 00:29:37 conn read error: EOF 2022/04/21 00:29:37 write 10 bytes, content is exit status 0xc000013a
> go run .\client.go 2022/04/21 00:29:13 begin dial... 2022/04/21 00:29:13 close ok 2022/04/21 00:29:13 read error: read tcp 127.0.0.1:61459->127.0.0.1:8888: use of closed network connection 2022/04/21 00:29:13 write error: write tcp 127.0.0.1:61459->127.0.0.1:8888: use of closed network connection exit status 0xc000013a > go run .\client.go 2022/04/21 00:29:37 begin dial... 2022/04/21 00:29:37 close ok 2022/04/21 00:29:37 read error: read tcp 127.0.0.1:61534->127.0.0.1:8888: use of closed network connection 2022/04/21 00:29:37 write error: write tcp 127.0.0.1:61534->127.0.0.1:8888: use of closed network connection exit status 0xc000013a
:::
从client1的结果来看,在己方已经关闭的socket
上再进行read
和write
操作,会得到”use of closed network connection
” error;从server
的执行结果来看,在对方关闭的socket
上执行read
操作会得到EOF error
,但write
操作会成功,因为数据会成功写入己方的内核socket
缓冲区中,即便最终发不到对方socket
缓冲区了,因为己方socket
并未关闭。因此当发现对方socket
关闭后,己方应该正确合理处理自己的socket
,再继续write
已经无任何意义了。
比如,EOF
:::details
package main import ( "errors" "io" "log" "net" ) func handleConn(c net.Conn) { defer c.Close() // read from the connection var buf = make([]byte, 10) log.Println("start to read from conn") n, err := c.Read(buf) if err != nil { if errors.Is(err, io.EOF) { log.Println("EOF occur----") return } log.Println("conn read error:", err) } else { log.Printf("read %d bytes, content is %s\n", n, string(buf[:n])) } n, err = c.Write(buf) if err != nil { log.Println("conn write error:", err) } else { log.Printf("write %d bytes, content is %s\n", n, string(buf[:n])) } } func main() { listen, err := net.Listen("tcp", ":8888") if err != nil { return } log.Println("start to listen") for { con, err := listen.Accept() if err != nil { return } log.Println("a new connection accept") handleConn(con) } }
package main import ( "log" "net" "time" ) func main() { log.Println("begin dial...") conn, err := net.Dial("tcp", ":8888") if err != nil { log.Println("dial error:", err) return } conn.Close() log.Println("close ok") var buf = make([]byte, 32) n, err := conn.Read(buf) if err != nil { log.Println("read error:", err) } else { log.Printf("read % bytes, content is %s\n", n, string(buf[:n])) } n, err = conn.Write(buf) if err != nil { log.Println("write error:", err) } else { log.Printf("write % bytes, content is %s\n", n, string(buf[:n])) } time.Sleep(time.Second * 1000) }
输出结果
> go run .\server.go 2022/04/21 00:53:18 start to listen 2022/04/21 00:53:24 a new connection accept 2022/04/21 00:53:24 start to read from conn 2022/04/21 00:53:24 EOF occur----
> go run .\client.go 2022/04/21 00:53:24 begin dial... 2022/04/21 00:53:24 close ok 2022/04/21 00:53:24 read error: read tcp 127.0.0.1:63470->127.0.0.1:8888: use of closed network connection 2022/04/21 00:53:24 write error: write tcp 127.0.0.1:63470->127.0.0.1:8888: use of closed network connection exit status 0xc000013a
从输出结果来看,在遭遇EOF
之后,server
不再write
,避免了server buf
的浪费。
:::
六、小结
本文比较基础,但却很重要,毕竟golang是面向大规模服务后端的,对通信环节的细节的深入理解会大有裨益。另外Go的goroutine+阻塞通信的网络通信模型降低了开发者心智负担,简化了通信的复杂性,这点尤为重要。