Golang 网络编程(三)

简介: Golang 网络编程(三)

HttpClient源码阅读#


DemoCode#


func main() {
  // 创建连接池
  // 创建客户端,绑定连接池
  // 发送请求
  // 读取响应
  transport := &http.Transport{
    DialContext: (&net.Dialer{
      Timeout:   30 * time.Second, // 连接超时
      KeepAlive: 30 * time.Second, // 长连接存活的时间
    }).DialContext,
    // 最大空闲连接数
    MaxIdleConns:          100,             
    // 超过最大空闲连接数的连接会在IdleConnTimeout后被销毁
    IdleConnTimeout:       10 * time.Second, 
    TLSHandshakeTimeout:   10 * time.Second, // tls握手超时时间
    ExpectContinueTimeout: 1 * time.Second,  // 100-continue 状态码超时时间
  }
  // 创建客户端
  client := &http.Client{
    Timeout:   time.Second * 10, //请求超时时间
    Transport: transport,
  }
  // 请求数据,获得响应
  res, err := client.Get("http://localhost:8081/login")
  if err != nil {
    fmt.Printf("error : %v", err)
    return
  }
  defer res.Body.Close()
  // 处理数据
  bytes, err := ioutil.ReadAll(res.Body)
  if err != nil {
    fmt.Printf("error : %v", err)
    return
  }
  fmt.Printf("Read from http server res:[%v]", string(bytes))
}


整理思路#


http.Client的代码其实是很多的,全部很细的过一遍肯定也会难度,下面可能也是只能提及其中的一部分。


首先明白一件事,我们编写的HttpClient是在干什么?(虽然这个问题很傻,但是总得问一下)是在发送Http请求。


一般我们在开发的时候,更多的编写的是HttpServer的代码。是在处理Http请求, 而不是去发送Http请求,Http请求都是是前端通过ajax经由浏览器发送到后端的。


其次,Http请求实际上是建立在tcp连接之上的,所以如果我们去看http.Client肯定能找到net.Dial("tcp",adds)相关的代码。


那也就是说,我们要看看,http.Client是如何在和服务端建立连接、发送数据、接收数据的。


重要的struct#


http.Client中有机几个比较重要的struct,如下

http.Client结构体中封装了和http请求相关的属性,诸如 cookie,timeout,redirect以及Transport。


type Client struct {
  Transport RoundTripper
  CheckRedirect func(req *Request, via []*Request) error
  Jar CookieJar
  Timeout time.Duration
}


Tranport实现了RoundTrpper接口:


type RoundTripper interface {   
  // 1、RoundTrip会去执行一个简单的 Http Trancation,并为requestt返回一个响应
  // 2、RoundTrip不会尝试去解析response
  // 3、注意:只要返回了Reponse,无论response的状态码是多少,RoundTrip返回的结果:err == nil 
  // 4、RoundTrip将请求发送出去后,如果他没有获取到response,他会返回一个非空的err。
  // 5、同样,RoundTrip不会尝试去解析诸如重定向、认证、cookie这种更高级的协议。 
  // 6、除了消费和关闭请求体之外,RoundTrip不会修改request的其他字段
  // 7、RoundTrip可以在一个单独的gorountine中读取request的部分字段。一直到ResponseBody关闭之前,调用者都不能取消,或者重用这个request
  // 8、RoundTrip始终会保证关闭Body(包含在发生err时)。根据实现的不同,在RoundTrip关闭前,关闭Body这件事可能会在一个单独的goroutine中去做。这就意味着,如果调用者想将请求体用于后续的请求,必须等待知道发生Close
  // 9、请求的URL和Header字段必须是被初始化的。 
  RoundTrip(*Request) (*Response, error)
}


看上面RoundTrpper接口,它里面只有一个方法RoundTrip,方法的作用就是执行一次Http请求,发送Request然后获取Response。


RoundTrpper被设计成了一个支持并发的结构体。


Transport结构体如下:


type Transport struct {
  idleMu     sync.Mutex
   // user has requested to close all idle conns
  wantIdle   bool
  // Transport的作用就是用来建立一个连接,这个idleConn就是Transport维护的空闲连接池。
  idleConn   map[connectMethodKey][]*persistConn // most recently used at end
  idleConnCh map[connectMethodKey]chan *persistConn
}


其中的connectMethodKey也是结构体:


type connectMethodKey struct {
  // proxy 代理的URL,当他不为空时,就会一直使用这个key 
  // scheme 协议的类型, http https
  // addr 代理的url,也就是下游的url
  proxy, scheme, addr string
}


persistConn是一个具体的连接实例,包含连接的上下文。


type persistConn struct {
  // alt可选地指定TLS NextProto RoundTripper。 
  // 这用于今天的HTTP / 2和以后的将来的协议。 如果非零,则其余字段未使用。
  alt RoundTripper
  t         *Transport
  cacheKey  connectMethodKey
  conn      net.Conn
  tlsState  *tls.ConnectionState
  // 用于从conn中读取内容
  br        *bufio.Reader       // from conn
  // 用于往conn中写内容
  bw        *bufio.Writer       // to conn
  nwrite    int64               // bytes written
  // 他是个chan,roundTrip会将readLoop中的内容写入到reqch中
  reqch     chan requestAndChan 
  // 他是个chan,roundTrip会将writeLoop中的内容写到writech中
  writech   chan writeRequest  
  closech   chan struct{}       // closed when conn closed


另外补充一个结构体:Request,他用来描述一次http请求的实例,它定义于http包request.go, 里面封装了对Http请求相关的属性


type Request struct {
   Method string
   URL *url.URL
   Proto      string // "HTTP/1.0"
   ProtoMajor int    // 1
   ProtoMinor int    // 0
   Header Header
   Body io.ReadCloser
   GetBody func() (io.ReadCloser, error)
   ContentLength int64
   TransferEncoding []string
   Close bool
   Host string
   Form url.Values
   PostForm url.Values
   MultipartForm *multipart.Form
   Trailer Header
   RemoteAddr string
   RequestURI string
   TLS *tls.ConnectionState
   Cancel <-chan struct{}
   Response *Response
   ctx context.Context
}


这几个结构体共同完成如下图所示http.Client的工作流程



流程#


我们想发送一次Http请求。首先我们需要构造一个Request,Request本质上是对Http协议的描述(因为大家使用的都是Http协议,所以将这个Request发送到HttpServer后,HttpServer能识别并解析它)。


// 从这行代码开始往下看
  res, err := client.Get("http://localhost:8081/login")
// 跟进Get
  req, err := NewRequest("GET", url, nil)
  if err != nil {
    return nil, err
  }
  return c.Do(req)
// 跟进Do
  func (c *Client) Do(req *Request) (*Response, error) {
  return c.do(req)
 } 
// 跟进do,do函数中有下面的逻辑,可以看到执行完send后已经拿到返回值了。所以我们得继续跟进send方法
  if resp, didTimeout, err = c.send(req, deadline); err != nil 
// 跟进send方法,可以看到send中还有一send方法,入参分别是:request,tranpost,deadline
// 到现在为止,我们没有看到有任何和服务端建立连接的动作发生,但是构造的req和拥有连接池的tranport已经见面了~
  resp, didTimeout, err = send(req, c.transport(), deadline)
// 继续跟进这个send方法,看到了调用了rt的RoundTrip方法。
// 这个rt就是我们编写HttpClient代码时创建的,绑定在http.Client上的tranport实例。
// 这个RoundTrip方法的作用我们在上面已经说过了,最直接的作用就是:发送request 并获取response。
  resp, err = rt.RoundTrip(req)


但是RoundTrip他是个定义在RoundTripper接口中的抽象方法,我们看代码肯定是要去看具体的实现嘛

这里可以使用断点调试法:在上面最后一行上打上断点,会进入到他的具体实现中。从图中可以看到具体的实现在roundtrip中。



RoundTrip中调用的函数是我们自定义的transport的roundTrip函数, 跟进去如下:

紧接着我们需要一个conn,这个conn我们通过Transport可以获取到。conn的类型为persistConn。


// roundTrip函数中又一个无限for循环
for {
    // 检查请求的上下文是否关闭了
    select {
    case <-ctx.Done():
      req.closeBody()
      return nil, ctx.Err()
    default:
    }
    // 对传递进来的req进行了有一层的封装,封装后的这个treq可以被roundTrip修改,所以每次重试都会新建
    treq := &transportRequest{Request: req, trace: trace}
    cm, err := t.connectMethodForRequest(treq)
    if err != nil {
      req.closeBody()
      return nil, err
    }
    // 到这里真的执行从tranport中获取和对应主机的连接,这个连接可能是http、https、http代理、http代理的高速缓存, 但是无论如何我们都已经准备好了向这个连接发送treq
    // 这里获取出来的连接就是我们在上文中提及的persistConn
    pconn, err := t.getConn(treq, cm)
    if err != nil {
      t.setReqCanceler(req, nil)
      req.closeBody()
      return nil, err
    }
    var resp *Response
    if pconn.alt != nil {
      // HTTP/2 path.
      t.decHostConnCount(cm.key()) // don't count cached http2 conns toward conns per host
      t.setReqCanceler(req, nil)   // not cancelable with CancelRequest
      resp, err = pconn.alt.RoundTrip(req)
    } else {
      // 调用persistConn的roundTrip方法,发送treq并获取响应。
      resp, err = pconn.roundTrip(treq)
    }
    if err == nil {
      return resp, nil
    }
    if !pconn.shouldRetryRequest(req, err) {
      // Issue 16465: return underlying net.Conn.Read error from peek,
      // as we've historically done.
      if e, ok := err.(transportReadFromServerError); ok {
        err = e.err
      }
      return nil, err
    }
    testHookRoundTripRetried()
    // Rewind the body if we're able to.  (HTTP/2 does this itself so we only
    // need to do it for HTTP/1.1 connections.)
    if req.GetBody != nil && pconn.alt == nil {
      newReq := *req
      var err error
      newReq.Body, err = req.GetBody()
      if err != nil {
        return nil, err
      }
      req = &newReq
    }
  }


整理思路:然后看上面代码中获取conn和roundTrip的实现细节。


我们需要一个conn,这个conn可以通过Transport获取到。conn的类型为persistConn。但是不管怎么样,都得先获取出 persistConn,才能进一步完成发送请求再得到服务端到响应。


然后关于这个persistConn结构体其实上面已经提及过了。重新贴在下面


type persistConn struct {
  // alt可选地指定TLS NextProto RoundTripper。 
  // 这用于今天的HTTP / 2和以后的将来的协议。 如果非零,则其余字段未使用。
  alt RoundTripper
  conn      net.Conn
  t         *Transport
  br        *bufio.Reader  // 用于从conn中读取内容
  bw        *bufio.Writer  // 用于往conn中写内容
  // 他是个chan,roundTrip会将readLoop中的内容写入到reqch中
  reqch     chan requestAndChan 
  // 他是个chan,roundTrip会将writeLoop中的内容写到writech中
  nwrite    int64               // bytes written
  cacheKey  connectMethodKey
  tlsState  *tls.ConnectionState
  writech   chan writeRequest  
  closech   chan struct{}       // closed when conn closed


跟进t.getConn(treq, cm)代码如下:


// 先尝试从空闲缓冲池中取得连接
  // 所谓的空闲缓冲池就是Tranport结构体中的: idleConn map[connectMethodKey][]*persistConn 
  // 入参位置的cm如下:
  /* type connectMethod struct {
      // 代理的url,如果没有代理的话,这个值为nil
      proxyURL     *url.URL 
      // 连接所使用的协议 http、https
      targetScheme string
      // 如果proxyURL指定了http代理或者是https代理,并且使用的协议是http而不是https。
      // 那么下面的targetAddr就会不包含在connect method key中。
      // 因为socket可以复用不同的targetAddr值
      targetAddr string
  }*/
  t.getIdleConn(cm);
  // 空闲缓冲池有的空闲连接的话返回conn,否则进行如下的select
  select {
    // todo 这里我还不确定是在干什么,目前猜测是这样的:每个服务器能打开的socket句柄是有限的
    // 每次来获取链接的时候,我们就计数+1。当整体的句柄在Host允许范围内时我们不做任何干涉~
    case <-t.incHostConnCount(cmKey):
      // count below conn per host limit; proceed
    // 重新尝试从空闲连接池中获取连接,因为可能有的连接使用完后被放回连接池了
    case pc := <-t.getIdleConnCh(cm):
      if trace != nil && trace.GotConn != nil {
        trace.GotConn(httptrace.GotConnInfo{Conn: pc.conn, Reused: pc.isReused()})
      }
      return pc, nil
    // 请求是否被取消了
    case <-req.Cancel:
      return nil, errRequestCanceledConn
    // 请求的上下文是否Done掉了
    case <-req.Context().Done():
      return nil, req.Context().Err()
    case err := <-cancelc:
      if err == errRequestCanceled {
        err = errRequestCanceledConn
      }
      return nil, err
    }
  // 开启新的gorountine新建连接一个连接
  go func() {
    /**
    * 新建连接,方法底层封装了tcp client dial相关的逻辑
    * conn, err := t.dial(ctx, "tcp", cm.addr())
    * 以及根据不同的targetScheme构建不同的request的逻辑。
    */
    // 获取到persistConn
    pc, err := t.dialConn(ctx, cm)
    // 将persistConn写到chan中
    dialc <- dialRes{pc, err}
  }()
  // 再尝试从空闲连接池中获取
  idleConnCh := t.getIdleConnCh(cm)
  select {
  // 如果上面的go协程拨号成功了,这里就能取出值来
  case v := <-dialc:
    // Our dial finished.
    if v.pc != nil {
      if trace != nil && trace.GotConn != nil && v.pc.alt == nil {
        trace.GotConn(httptrace.GotConnInfo{Conn: v.pc.conn})
      }
      return v.pc, nil
    }
    // Our dial failed. See why to return a nicer error
    // value.
    // 将Host的连接-1
    t.decHostConnCount(cmKey)
    select {
    ...


transport.dialConn#


下面代码中的cm长这样



// dialConn是Transprot的方法
// 入参:context上下文, connectMethod
// 出参:persisnConn
func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (*persistConn, error) {
  // 构建将要返回的 persistConn
  pconn := &persistConn{
    t:             t,
    cacheKey:      cm.key(),
    reqch:         make(chan requestAndChan, 1),
    writech:       make(chan writeRequest, 1),
    closech:       make(chan struct{}),
    writeErrCh:    make(chan error, 1),
    writeLoopDone: make(chan struct{}),
  }
  trace := httptrace.ContextClientTrace(ctx)
  wrapErr := func(err error) error {
    if cm.proxyURL != nil {
      // Return a typed error, per Issue 16997
      return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err}
    }
    return err
  }
  // 判断cm中使用的协议是否是https
  if cm.scheme() == "https" && t.DialTLS != nil {
    var err error
    pconn.conn, err = t.DialTLS("tcp", cm.addr())
    if err != nil {
      return nil, wrapErr(err)
    }
    if pconn.conn == nil {
      return nil, wrapErr(errors.New("net/http: Transport.DialTLS returned (nil, nil)"))
    }
    if tc, ok := pconn.conn.(*tls.Conn); ok {
      // Handshake here, in case DialTLS didn't. TLSNextProto below
      // depends on it for knowing the connection state.
      if trace != nil && trace.TLSHandshakeStart != nil {
        trace.TLSHandshakeStart()
      }
      if err := tc.Handshake(); err != nil {
        go pconn.conn.Close()
        if trace != nil && trace.TLSHandshakeDone != nil {
          trace.TLSHandshakeDone(tls.ConnectionState{}, err)
        }
        return nil, err
      }
      cs := tc.ConnectionState()
      if trace != nil && trace.TLSHandshakeDone != nil {
        trace.TLSHandshakeDone(cs, nil)
      }
      pconn.tlsState = &cs
    }
  } else {
    // 如果不是https协议就来到这里,使用tcp向httpserver拨号,获取一个tcp连接。
    conn, err := t.dial(ctx, "tcp", cm.addr())
    if err != nil {
      return nil, wrapErr(err)
    }
    // 将获取到tcp连接交给我们的persistConn维护
    pconn.conn = conn
    // 处理https相关逻辑
    if cm.scheme() == "https" {
      var firstTLSHost string
      if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
        return nil, wrapErr(err)
      }
      if err = pconn.addTLS(firstTLSHost, trace); err != nil {
        return nil, wrapErr(err)
      }
    }
  }
  // Proxy setup.
  switch {
  // 如果代理URL为空,不做任何处理  
  case cm.proxyURL == nil:
    // Do nothing. Not using a proxy.
  //   
  case cm.proxyURL.Scheme == "socks5":
    conn := pconn.conn
    d := socksNewDialer("tcp", conn.RemoteAddr().String())
    if u := cm.proxyURL.User; u != nil {
      auth := &socksUsernamePassword{
        Username: u.Username(),
      }
      auth.Password, _ = u.Password()
      d.AuthMethods = []socksAuthMethod{
        socksAuthMethodNotRequired,
        socksAuthMethodUsernamePassword,
      }
      d.Authenticate = auth.Authenticate
    }
    if _, err := d.DialWithConn(ctx, conn, "tcp", cm.targetAddr); err != nil {
      conn.Close()
      return nil, err
    }
  case cm.targetScheme == "http":
    pconn.isProxy = true
    if pa := cm.proxyAuth(); pa != "" {
      pconn.mutateHeaderFunc = func(h Header) {
        h.Set("Proxy-Authorization", pa)
      }
    }
  case cm.targetScheme == "https":
    conn := pconn.conn
    hdr := t.ProxyConnectHeader
    if hdr == nil {
      hdr = make(Header)
    }
    connectReq := &Request{
      Method: "CONNECT",
      URL:    &url.URL{Opaque: cm.targetAddr},
      Host:   cm.targetAddr,
      Header: hdr,
    }
    if pa := cm.proxyAuth(); pa != "" {
      connectReq.Header.Set("Proxy-Authorization", pa)
    }
    connectReq.Write(conn)
    // Read response.
    // Okay to use and discard buffered reader here, because
    // TLS server will not speak until spoken to.
    br := bufio.NewReader(conn)
    resp, err := ReadResponse(br, connectReq)
    if err != nil {
      conn.Close()
      return nil, err
    }
    if resp.StatusCode != 200 {
      f := strings.SplitN(resp.Status, " ", 2)
      conn.Close()
      if len(f) < 2 {
        return nil, errors.New("unknown status code")
      }
      return nil, errors.New(f[1])
    }
  }
  if cm.proxyURL != nil && cm.targetScheme == "https" {
    if err := pconn.addTLS(cm.tlsHost(), trace); err != nil {
      return nil, err
    }
  }
  if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
    if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {
      return &persistConn{alt: next(cm.targetAddr, pconn.conn.(*tls.Conn))}, nil
    }
  }
  if t.MaxConnsPerHost > 0 {
    pconn.conn = &connCloseListener{Conn: pconn.conn, t: t, cmKey: pconn.cacheKey}
  }
  // 初始化persistConn的bufferReader和bufferWriter
  pconn.br = bufio.NewReader(pconn) // 可以从上面给pconn维护的tcpConn中读数据
  pconn.bw = bufio.NewWriter(persistConnWriter{pconn})// 可以往上面pconn维护的tcpConn中写数据 
  // 新开启两条和persistConn相关的go协程。
  go pconn.readLoop()
  go pconn.writeLoop()
  return pconn, nil
}


上面的两条goroutine 和 br bw共同完成如下图的流程



发送请求#


发送req的逻辑在http包的下的tranport包中的func (t *Transport) roundTrip(req *Request) (*Response, error) {}函数中。


如下:


// 发送treq
  resp, err = pconn.roundTrip(treq)
  // 跟进roundTrip
  // 可以看到他将一个writeRequest结构体类型的实例写入了writech中
  // 而这个writech会被上图中的writeLoop消费,借助bufferWriter写入tcp连接中,完成往服务端数据的发送。
  pc.writech <- writeRequest{req, writeErrCh, continueCh}


相关文章
|
1月前
|
Go
golang力扣leetcode 2039.网络空闲的时刻
golang力扣leetcode 2039.网络空闲的时刻
16 0
|
7月前
|
监控 网络协议 Go
Golang抓包:实现网络数据包捕获与分析
Golang抓包:实现网络数据包捕获与分析
|
1天前
|
Go
golang读取网络字节并解压zip
golang读取网络字节并解压zip
3 0
|
1月前
|
存储 网络协议 Go
Golang网络聊天室案例
Golang网络聊天室案例
43 2
Golang网络聊天室案例
|
安全 Java Go
Golang出现泛型后,Gin怎么封装网络请求处理
Go 1.18后出现泛型,小白怎么使用Gin框架怎么根据泛型封装客户端请求,
483 0
|
网络协议 安全 Java
Golang 网络编程(二)
Golang 网络编程(二)
147 0
|
XML 存储 JSON
Golang 网络编程(一)
Golang 网络编程(一)
149 0
|
运维 监控 网络协议
golang 服务诡异499、504网络故障排查
事故经过 排查 总结 事故经过 11-01 12:00 中午午饭期间,手机突然收到业务网关非200异常报警,平时也会有一些少量499或者网络抖动问题触发报警,但是很快就会恢复(目前配置的报警阈值是5%,阈值跟当时的采样窗口qps有直接关系)。
5158 0
|
负载均衡 网络协议 前端开发
【开源】gnet: 一个轻量级且高性能的 Golang 网络库
gnet 是一个基于 Event-Loop 事件驱动的高性能和轻量级网络库。这个库直接使用 epoll 和 kqueue 系统调用而非标准 Golang 网络包:net 来构建网络应用,它的工作原理类似于两个开源的网络库:libuv 和 libevent。
3203 0