HttpClient源码阅读#
DemoCode#
func main() { // 创建连接池 // 创建客户端,绑定连接池 // 发送请求 // 读取响应 transport := &http.Transport{ DialContext: (&net.Dialer{ Timeout: 30 * time.Second, // 连接超时 KeepAlive: 30 * time.Second, // 长连接存活的时间 }).DialContext, // 最大空闲连接数 MaxIdleConns: 100, // 超过最大空闲连接数的连接会在IdleConnTimeout后被销毁 IdleConnTimeout: 10 * time.Second, TLSHandshakeTimeout: 10 * time.Second, // tls握手超时时间 ExpectContinueTimeout: 1 * time.Second, // 100-continue 状态码超时时间 } // 创建客户端 client := &http.Client{ Timeout: time.Second * 10, //请求超时时间 Transport: transport, } // 请求数据,获得响应 res, err := client.Get("http://localhost:8081/login") if err != nil { fmt.Printf("error : %v", err) return } defer res.Body.Close() // 处理数据 bytes, err := ioutil.ReadAll(res.Body) if err != nil { fmt.Printf("error : %v", err) return } fmt.Printf("Read from http server res:[%v]", string(bytes)) }
整理思路#
http.Client的代码其实是很多的,全部很细的过一遍肯定也会难度,下面可能也是只能提及其中的一部分。
首先明白一件事,我们编写的HttpClient是在干什么?(虽然这个问题很傻,但是总得问一下)是在发送Http请求。
一般我们在开发的时候,更多的编写的是HttpServer的代码。是在处理Http请求, 而不是去发送Http请求,Http请求都是是前端通过ajax经由浏览器发送到后端的。
其次,Http请求实际上是建立在tcp连接之上的,所以如果我们去看http.Client肯定能找到net.Dial("tcp",adds)
相关的代码。
那也就是说,我们要看看,http.Client是如何在和服务端建立连接、发送数据、接收数据的。
重要的struct#
http.Client中有机几个比较重要的struct,如下
http.Client结构体中封装了和http请求相关的属性,诸如 cookie,timeout,redirect以及Transport。
type Client struct { Transport RoundTripper CheckRedirect func(req *Request, via []*Request) error Jar CookieJar Timeout time.Duration }
Tranport实现了RoundTrpper接口:
type RoundTripper interface { // 1、RoundTrip会去执行一个简单的 Http Trancation,并为requestt返回一个响应 // 2、RoundTrip不会尝试去解析response // 3、注意:只要返回了Reponse,无论response的状态码是多少,RoundTrip返回的结果:err == nil // 4、RoundTrip将请求发送出去后,如果他没有获取到response,他会返回一个非空的err。 // 5、同样,RoundTrip不会尝试去解析诸如重定向、认证、cookie这种更高级的协议。 // 6、除了消费和关闭请求体之外,RoundTrip不会修改request的其他字段 // 7、RoundTrip可以在一个单独的gorountine中读取request的部分字段。一直到ResponseBody关闭之前,调用者都不能取消,或者重用这个request // 8、RoundTrip始终会保证关闭Body(包含在发生err时)。根据实现的不同,在RoundTrip关闭前,关闭Body这件事可能会在一个单独的goroutine中去做。这就意味着,如果调用者想将请求体用于后续的请求,必须等待知道发生Close // 9、请求的URL和Header字段必须是被初始化的。 RoundTrip(*Request) (*Response, error) }
看上面RoundTrpper接口,它里面只有一个方法RoundTrip
,方法的作用就是执行一次Http请求,发送Request然后获取Response。
RoundTrpper被设计成了一个支持并发的结构体。
Transport结构体如下:
type Transport struct { idleMu sync.Mutex // user has requested to close all idle conns wantIdle bool // Transport的作用就是用来建立一个连接,这个idleConn就是Transport维护的空闲连接池。 idleConn map[connectMethodKey][]*persistConn // most recently used at end idleConnCh map[connectMethodKey]chan *persistConn }
其中的connectMethodKey也是结构体:
type connectMethodKey struct { // proxy 代理的URL,当他不为空时,就会一直使用这个key // scheme 协议的类型, http https // addr 代理的url,也就是下游的url proxy, scheme, addr string }
persistConn是一个具体的连接实例,包含连接的上下文。
type persistConn struct { // alt可选地指定TLS NextProto RoundTripper。 // 这用于今天的HTTP / 2和以后的将来的协议。 如果非零,则其余字段未使用。 alt RoundTripper t *Transport cacheKey connectMethodKey conn net.Conn tlsState *tls.ConnectionState // 用于从conn中读取内容 br *bufio.Reader // from conn // 用于往conn中写内容 bw *bufio.Writer // to conn nwrite int64 // bytes written // 他是个chan,roundTrip会将readLoop中的内容写入到reqch中 reqch chan requestAndChan // 他是个chan,roundTrip会将writeLoop中的内容写到writech中 writech chan writeRequest closech chan struct{} // closed when conn closed
另外补充一个结构体:Request,他用来描述一次http请求的实例,它定义于http包request.go, 里面封装了对Http请求相关的属性
type Request struct { Method string URL *url.URL Proto string // "HTTP/1.0" ProtoMajor int // 1 ProtoMinor int // 0 Header Header Body io.ReadCloser GetBody func() (io.ReadCloser, error) ContentLength int64 TransferEncoding []string Close bool Host string Form url.Values PostForm url.Values MultipartForm *multipart.Form Trailer Header RemoteAddr string RequestURI string TLS *tls.ConnectionState Cancel <-chan struct{} Response *Response ctx context.Context }
这几个结构体共同完成如下图所示http.Client的工作流程
流程#
我们想发送一次Http请求。首先我们需要构造一个Request,Request本质上是对Http协议的描述(因为大家使用的都是Http协议,所以将这个Request发送到HttpServer后,HttpServer能识别并解析它)。
// 从这行代码开始往下看 res, err := client.Get("http://localhost:8081/login") // 跟进Get req, err := NewRequest("GET", url, nil) if err != nil { return nil, err } return c.Do(req) // 跟进Do func (c *Client) Do(req *Request) (*Response, error) { return c.do(req) } // 跟进do,do函数中有下面的逻辑,可以看到执行完send后已经拿到返回值了。所以我们得继续跟进send方法 if resp, didTimeout, err = c.send(req, deadline); err != nil // 跟进send方法,可以看到send中还有一send方法,入参分别是:request,tranpost,deadline // 到现在为止,我们没有看到有任何和服务端建立连接的动作发生,但是构造的req和拥有连接池的tranport已经见面了~ resp, didTimeout, err = send(req, c.transport(), deadline) // 继续跟进这个send方法,看到了调用了rt的RoundTrip方法。 // 这个rt就是我们编写HttpClient代码时创建的,绑定在http.Client上的tranport实例。 // 这个RoundTrip方法的作用我们在上面已经说过了,最直接的作用就是:发送request 并获取response。 resp, err = rt.RoundTrip(req)
但是RoundTrip他是个定义在RoundTripper接口中的抽象方法,我们看代码肯定是要去看具体的实现嘛
这里可以使用断点调试法:在上面最后一行上打上断点,会进入到他的具体实现中。从图中可以看到具体的实现在roundtrip中。
RoundTrip
中调用的函数是我们自定义的transport的roundTrip函数, 跟进去如下:
紧接着我们需要一个conn,这个conn我们通过Transport可以获取到。conn的类型为persistConn。
// roundTrip函数中又一个无限for循环 for { // 检查请求的上下文是否关闭了 select { case <-ctx.Done(): req.closeBody() return nil, ctx.Err() default: } // 对传递进来的req进行了有一层的封装,封装后的这个treq可以被roundTrip修改,所以每次重试都会新建 treq := &transportRequest{Request: req, trace: trace} cm, err := t.connectMethodForRequest(treq) if err != nil { req.closeBody() return nil, err } // 到这里真的执行从tranport中获取和对应主机的连接,这个连接可能是http、https、http代理、http代理的高速缓存, 但是无论如何我们都已经准备好了向这个连接发送treq // 这里获取出来的连接就是我们在上文中提及的persistConn pconn, err := t.getConn(treq, cm) if err != nil { t.setReqCanceler(req, nil) req.closeBody() return nil, err } var resp *Response if pconn.alt != nil { // HTTP/2 path. t.decHostConnCount(cm.key()) // don't count cached http2 conns toward conns per host t.setReqCanceler(req, nil) // not cancelable with CancelRequest resp, err = pconn.alt.RoundTrip(req) } else { // 调用persistConn的roundTrip方法,发送treq并获取响应。 resp, err = pconn.roundTrip(treq) } if err == nil { return resp, nil } if !pconn.shouldRetryRequest(req, err) { // Issue 16465: return underlying net.Conn.Read error from peek, // as we've historically done. if e, ok := err.(transportReadFromServerError); ok { err = e.err } return nil, err } testHookRoundTripRetried() // Rewind the body if we're able to. (HTTP/2 does this itself so we only // need to do it for HTTP/1.1 connections.) if req.GetBody != nil && pconn.alt == nil { newReq := *req var err error newReq.Body, err = req.GetBody() if err != nil { return nil, err } req = &newReq } }
整理思路:然后看上面代码中获取conn和roundTrip的实现细节。
我们需要一个conn,这个conn可以通过Transport获取到。conn的类型为persistConn。但是不管怎么样,都得先获取出 persistConn,才能进一步完成发送请求再得到服务端到响应。
然后关于这个persistConn结构体其实上面已经提及过了。重新贴在下面
type persistConn struct { // alt可选地指定TLS NextProto RoundTripper。 // 这用于今天的HTTP / 2和以后的将来的协议。 如果非零,则其余字段未使用。 alt RoundTripper conn net.Conn t *Transport br *bufio.Reader // 用于从conn中读取内容 bw *bufio.Writer // 用于往conn中写内容 // 他是个chan,roundTrip会将readLoop中的内容写入到reqch中 reqch chan requestAndChan // 他是个chan,roundTrip会将writeLoop中的内容写到writech中 nwrite int64 // bytes written cacheKey connectMethodKey tlsState *tls.ConnectionState writech chan writeRequest closech chan struct{} // closed when conn closed
跟进t.getConn(treq, cm)
代码如下:
// 先尝试从空闲缓冲池中取得连接 // 所谓的空闲缓冲池就是Tranport结构体中的: idleConn map[connectMethodKey][]*persistConn // 入参位置的cm如下: /* type connectMethod struct { // 代理的url,如果没有代理的话,这个值为nil proxyURL *url.URL // 连接所使用的协议 http、https targetScheme string // 如果proxyURL指定了http代理或者是https代理,并且使用的协议是http而不是https。 // 那么下面的targetAddr就会不包含在connect method key中。 // 因为socket可以复用不同的targetAddr值 targetAddr string }*/ t.getIdleConn(cm); // 空闲缓冲池有的空闲连接的话返回conn,否则进行如下的select select { // todo 这里我还不确定是在干什么,目前猜测是这样的:每个服务器能打开的socket句柄是有限的 // 每次来获取链接的时候,我们就计数+1。当整体的句柄在Host允许范围内时我们不做任何干涉~ case <-t.incHostConnCount(cmKey): // count below conn per host limit; proceed // 重新尝试从空闲连接池中获取连接,因为可能有的连接使用完后被放回连接池了 case pc := <-t.getIdleConnCh(cm): if trace != nil && trace.GotConn != nil { trace.GotConn(httptrace.GotConnInfo{Conn: pc.conn, Reused: pc.isReused()}) } return pc, nil // 请求是否被取消了 case <-req.Cancel: return nil, errRequestCanceledConn // 请求的上下文是否Done掉了 case <-req.Context().Done(): return nil, req.Context().Err() case err := <-cancelc: if err == errRequestCanceled { err = errRequestCanceledConn } return nil, err } // 开启新的gorountine新建连接一个连接 go func() { /** * 新建连接,方法底层封装了tcp client dial相关的逻辑 * conn, err := t.dial(ctx, "tcp", cm.addr()) * 以及根据不同的targetScheme构建不同的request的逻辑。 */ // 获取到persistConn pc, err := t.dialConn(ctx, cm) // 将persistConn写到chan中 dialc <- dialRes{pc, err} }() // 再尝试从空闲连接池中获取 idleConnCh := t.getIdleConnCh(cm) select { // 如果上面的go协程拨号成功了,这里就能取出值来 case v := <-dialc: // Our dial finished. if v.pc != nil { if trace != nil && trace.GotConn != nil && v.pc.alt == nil { trace.GotConn(httptrace.GotConnInfo{Conn: v.pc.conn}) } return v.pc, nil } // Our dial failed. See why to return a nicer error // value. // 将Host的连接-1 t.decHostConnCount(cmKey) select { ...
transport.dialConn#
下面代码中的cm长这样
// dialConn是Transprot的方法 // 入参:context上下文, connectMethod // 出参:persisnConn func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (*persistConn, error) { // 构建将要返回的 persistConn pconn := &persistConn{ t: t, cacheKey: cm.key(), reqch: make(chan requestAndChan, 1), writech: make(chan writeRequest, 1), closech: make(chan struct{}), writeErrCh: make(chan error, 1), writeLoopDone: make(chan struct{}), } trace := httptrace.ContextClientTrace(ctx) wrapErr := func(err error) error { if cm.proxyURL != nil { // Return a typed error, per Issue 16997 return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err} } return err } // 判断cm中使用的协议是否是https if cm.scheme() == "https" && t.DialTLS != nil { var err error pconn.conn, err = t.DialTLS("tcp", cm.addr()) if err != nil { return nil, wrapErr(err) } if pconn.conn == nil { return nil, wrapErr(errors.New("net/http: Transport.DialTLS returned (nil, nil)")) } if tc, ok := pconn.conn.(*tls.Conn); ok { // Handshake here, in case DialTLS didn't. TLSNextProto below // depends on it for knowing the connection state. if trace != nil && trace.TLSHandshakeStart != nil { trace.TLSHandshakeStart() } if err := tc.Handshake(); err != nil { go pconn.conn.Close() if trace != nil && trace.TLSHandshakeDone != nil { trace.TLSHandshakeDone(tls.ConnectionState{}, err) } return nil, err } cs := tc.ConnectionState() if trace != nil && trace.TLSHandshakeDone != nil { trace.TLSHandshakeDone(cs, nil) } pconn.tlsState = &cs } } else { // 如果不是https协议就来到这里,使用tcp向httpserver拨号,获取一个tcp连接。 conn, err := t.dial(ctx, "tcp", cm.addr()) if err != nil { return nil, wrapErr(err) } // 将获取到tcp连接交给我们的persistConn维护 pconn.conn = conn // 处理https相关逻辑 if cm.scheme() == "https" { var firstTLSHost string if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil { return nil, wrapErr(err) } if err = pconn.addTLS(firstTLSHost, trace); err != nil { return nil, wrapErr(err) } } } // Proxy setup. switch { // 如果代理URL为空,不做任何处理 case cm.proxyURL == nil: // Do nothing. Not using a proxy. // case cm.proxyURL.Scheme == "socks5": conn := pconn.conn d := socksNewDialer("tcp", conn.RemoteAddr().String()) if u := cm.proxyURL.User; u != nil { auth := &socksUsernamePassword{ Username: u.Username(), } auth.Password, _ = u.Password() d.AuthMethods = []socksAuthMethod{ socksAuthMethodNotRequired, socksAuthMethodUsernamePassword, } d.Authenticate = auth.Authenticate } if _, err := d.DialWithConn(ctx, conn, "tcp", cm.targetAddr); err != nil { conn.Close() return nil, err } case cm.targetScheme == "http": pconn.isProxy = true if pa := cm.proxyAuth(); pa != "" { pconn.mutateHeaderFunc = func(h Header) { h.Set("Proxy-Authorization", pa) } } case cm.targetScheme == "https": conn := pconn.conn hdr := t.ProxyConnectHeader if hdr == nil { hdr = make(Header) } connectReq := &Request{ Method: "CONNECT", URL: &url.URL{Opaque: cm.targetAddr}, Host: cm.targetAddr, Header: hdr, } if pa := cm.proxyAuth(); pa != "" { connectReq.Header.Set("Proxy-Authorization", pa) } connectReq.Write(conn) // Read response. // Okay to use and discard buffered reader here, because // TLS server will not speak until spoken to. br := bufio.NewReader(conn) resp, err := ReadResponse(br, connectReq) if err != nil { conn.Close() return nil, err } if resp.StatusCode != 200 { f := strings.SplitN(resp.Status, " ", 2) conn.Close() if len(f) < 2 { return nil, errors.New("unknown status code") } return nil, errors.New(f[1]) } } if cm.proxyURL != nil && cm.targetScheme == "https" { if err := pconn.addTLS(cm.tlsHost(), trace); err != nil { return nil, err } } if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" { if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok { return &persistConn{alt: next(cm.targetAddr, pconn.conn.(*tls.Conn))}, nil } } if t.MaxConnsPerHost > 0 { pconn.conn = &connCloseListener{Conn: pconn.conn, t: t, cmKey: pconn.cacheKey} } // 初始化persistConn的bufferReader和bufferWriter pconn.br = bufio.NewReader(pconn) // 可以从上面给pconn维护的tcpConn中读数据 pconn.bw = bufio.NewWriter(persistConnWriter{pconn})// 可以往上面pconn维护的tcpConn中写数据 // 新开启两条和persistConn相关的go协程。 go pconn.readLoop() go pconn.writeLoop() return pconn, nil }
上面的两条goroutine 和 br bw共同完成如下图的流程
发送请求#
发送req的逻辑在http包的下的tranport包中的func (t *Transport) roundTrip(req *Request) (*Response, error) {}
函数中。
如下:
// 发送treq resp, err = pconn.roundTrip(treq) // 跟进roundTrip // 可以看到他将一个writeRequest结构体类型的实例写入了writech中 // 而这个writech会被上图中的writeLoop消费,借助bufferWriter写入tcp连接中,完成往服务端数据的发送。 pc.writech <- writeRequest{req, writeErrCh, continueCh}