Etcd源码分析:网络-阿里云开发者社区

开发者社区> 开发与运维> 正文

Etcd源码分析:网络

简介: 配置文件 etcd配置文件位于/etc/etcd/etcd.conf,该配置文件一共有5个section 名称 作用 member 本节点的配置,包括监听服务端口、心跳时间等 cluster 集群配置,包括集群状态、集群名称以及本节点广播地址 proxy 用于网络自动发现服务 security 安.

配置文件

etcd配置文件位于/etc/etcd/etcd.conf,该配置文件一共有5个section

名称 作用
member 本节点的配置,包括监听服务端口、心跳时间等
cluster 集群配置,包括集群状态、集群名称以及本节点广播地址
proxy 用于网络自动发现服务
security 安全配置
logging 日志功能组件

具体配置采集可以见另一个文章《Etcd集群配置和使用》

初次看到配置文件,都会有一个疑问,为什么在members已经设置了监听服务地址,为什么在cluster还要再次设置一次广播地址呢?

原因:etcd主要的通信协议主要是http协议,对于http协议中所周知它是B/S结构,而非C/S结构,只能一端主动给另一端发消息而反过来则不可。所以对于集群来说,双方必须都要知道对方具体监听地址。

服务监听

我们都知道,建立socket服务端一共有5个基本步骤(C语言):
1、创建socket套接字
2、bind地址及端口
3、listen监听服务4
4、accept接收客户端连接
5、启动新线程为客户端服务。
正所谓万变不离其宗,到了etcd中(etcd使用默认golang http模块)也是这些步骤,只不过是被封装了一下(语法糖)

启动流程见《Etcd源码分析:启动篇》

listener

当进入embed/etcd.go里面的StartEtcd()函数的时候

      //为peer创建listener,socket三部曲只到了第二个步骤
    if e.Peers, err = startPeerListeners(cfg); err != nil {
        return e, err
    }
      //为client创建listener,socket三部曲只到了第二个步骤
    if e.sctxs, err = startClientListeners(cfg); err != nil {
        return e, err
    }

在创建了listener之后,开始创建EtcdServer


      //创建EtcdServer并且创建raftNode并运行raftNode
    if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
        return e, err
    }

    // buffer channel so goroutines on closed connections won't wait forever
    e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))

······
    e.Server.Start()

    if err = e.servePeers(); err != nil {
        return e, err
    }
    if err = e.serveClients(); err != nil {
        return e, err
    }
    if err = e.serveMetrics(); err != nil {
        return e, err
    }

    serving = true

Listener有两个分别为:peer listener和client listener,两者大同小异,这里拿peer listener做为分析对象。

func startPeerListeners(cfg *Config) (peers []*peerListener, err error) {
······
    peers = make([]*peerListener, len(cfg.LPUrls))
······
    for i, u := range cfg.LPUrls {   //循环遍历多个peer url
        if u.Scheme == "http" {
            if !cfg.PeerTLSInfo.Empty() {
                plog.Warningf("The scheme of peer url %s is HTTP while peer key/cert files are presented. Ignored peer key/cert files.", u.String())
            }
            if cfg.PeerTLSInfo.ClientCertAuth {
                plog.Warningf("The scheme of peer url %s is HTTP while client cert auth (--peer-client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String())
            }
        }
        // 构造peerListener对象 监听2380 作为服务端模式
        peers[i] = &peerListener{close: func(context.Context) error { return nil }}
        
        //调用接口,创建listener对象,返回来之后,socket套接字已经完成listener监听流程
        peers[i].Listener, err = rafthttp.NewListener(u, &cfg.PeerTLSInfo)
        if err != nil {
            return nil, err
        }
        // once serve, overwrite with 'http.Server.Shutdown'
        peers[i].close = func(context.Context) error {
            return peers[i].Listener.Close()
        }
        plog.Info("listening for peers on ", u.String())
    }
    return peers, nil
}

下面调用关系为

startPeerListeners()  [embed/etcd.go]  
-> rafthttp.NewListener()  [rafthttp/util.go]
    -> transport.NewTimeoutListener()  [pkg/transport/timeout_listener.go]
        -> newListener()  [pkg/transport/listener.go]
            -> net.Listen()  [golang net库函数]

服务监听

服务端socket需要调用Accept方法,我们来看一下serve方法。方法serve大致内容为:将每个服务放到gorouting中,也就是启动一个协程来监听服务。

先看看servePeers()

func (e *Etcd) servePeers() (err error) {
    ph := etcdhttp.NewPeerHandler(e.Server)
    var peerTLScfg *tls.Config
    if !e.cfg.PeerTLSInfo.Empty() {
        if peerTLScfg, err = e.cfg.PeerTLSInfo.ServerConfig(); err != nil {
            return err
        }
    }

    for _, p := range e.Peers {
        gs := v3rpc.Server(e.Server, peerTLScfg)
        m := cmux.New(p.Listener)
        go gs.Serve(m.Match(cmux.HTTP2()))
        srv := &http.Server{
            Handler:     grpcHandlerFunc(gs, ph),
            ReadTimeout: 5 * time.Minute,
            ErrorLog:    defaultLog.New(ioutil.Discard, "", 0), // do not log user error
        }
        go srv.Serve(m.Match(cmux.Any()))
        p.serve = func() error { return m.Serve() }
        p.close = func(ctx context.Context) error {
            // gracefully shutdown http.Server
            // close open listeners, idle connections
            // until context cancel or time-out
            stopServers(ctx, &servers{secure: peerTLScfg != nil, grpc: gs, http: srv})
            return nil
        }
    }

    // start peer servers in a goroutine
    for _, pl := range e.Peers {
        go func(l *peerListener) {
            e.errHandler(l.serve())
        }(pl)
    }
    return nil
}

1、生成http.hander 用于处理peer请求;
2、在for循环里面,起一些goroutine,调用Server()函数来接受Listener传入的连接。

我们来看看NewPeerHandler()

func newPeerHandler(cluster api.Cluster, raftHandler http.Handler, leaseHandler http.Handler) http.Handler {
    mh := &peerMembersHandler{
        cluster: cluster,
    }

 //将url和业务层handler注册到servemux中,也就是每一个url请求都会有其对应的handler进行处理
     //初始化一个Serve Multiplexer结构
      mux := http.NewServeMux()
    mux.HandleFunc("/", http.NotFound)
    mux.Handle(rafthttp.RaftPrefix, raftHandler)
    mux.Handle(rafthttp.RaftPrefix+"/", raftHandler) 
    mux.Handle(peerMembersPrefix, mh)  //处理请求/members handler是mh,即peerMembersHandler
    if leaseHandler != nil {
        mux.Handle(leasehttp.LeasePrefix, leaseHandler)
        mux.Handle(leasehttp.LeaseInternalPrefix, leaseHandler)
    }
    mux.HandleFunc(versionPath, versionHandler(cluster, serveVersion))
    return mux
}

应用层业务逻辑需要自己注册url和handler,这样才能保证每个http request都能够被处理。而每个handler都必须要实现对应接口ServeHTTP,例如peerMembersHandler,实现的ServeHTTP接口是用于返回集群成员列表

那么此处只是完成注册,那么在什么地方会调用此处handler?

答案是在ServeHTTP()里面

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:
开发与运维
使用钉钉扫一扫加入圈子
+ 订阅

集结各类场景实战经验,助你开发运维畅行无忧

其他文章