《ranch 源代码分析 (Erlang实现的网络库)》

简介: 最新版本的cowboy已经把和网络相关的逻辑放到了ranch的代码里。ranch是一个类似ANET的网络库. 它的设计目标是高并发,低延迟,模块化。

最新版本的cowboy已经把和网络相关的逻辑放到了ranch的代码里。ranch是一个类似ANET的网络库. 它的设计目标是高并发,低延迟,模块化。

之前一直在使用C++,最近学习了erlang,对比一下这两门不同思路的语言

1) 语言底层:最显着的不同是erlang是解释型动态语言,C++是静态的。
2) 语言功能:Erlang是函数式和轻量级进程(要彻底理解函数式最直接的就是通读SICP)。变量不可变。所以,函数是没有状态的,状态体现在函数的参数上。而C++可以修改一切内存.
   另外轻量级进程:进程是用户态级别的结构体。所以进程的创建和切换代价很小。而C++的pthread对应系统的一个线程.
3) 设计范式:个人理解,从代码上erlang中的一个模块对应C++的一个类,都是封装了对一个实体的操作. 不同的是C++的操作是和对象绑定的(编译器自动把对象的指针作为成员函数的第一个参数),而erlang是手动把要操作的实体作为第一个参数。
   erlang中的进程对应到C++的一份对象; erlang中的一个消息对应c++的一次对象的函数调用; 由于erlang中的进程是廉价的,所以有着和C++完全不同的面向进程的编程范式。因此OTP抽象出了’server模式‘,’监督模式‘,’状态机模式‘等模式。
   而C++的23种设计模式本身就是基于单线程的思路。
   举个例子,在C++中调用成员函数 `A a; a.f();' 被编译器翻译成 `A::f(&a);'。
   而erlang中调用模块的函数修改对象的状态,要这样做 `A:f(Ref)',Ref是要修改的对象,A:f是修改对象的方法。

一个例子 hello,world!

ranch的源码自带了一个tcp_echo的例子

1) 要使用ranch首先要启动ranch,application:start(ranch)。
2) 提供一个回调模块echo_protocol。
3) 启动ranch监听ranch:start_ranch_acceptors_suplistener。

深入代码

从application:start(ranch)开始

ranch_sup监督者会启动一个ranch_server,负责维护配置信息,低层存储用ets。ranch_server是gen_server模式,接收各种消息。
仔细察看ranch_server.erl。这个模块提供了操作ranch_server的各种函数。
这个模块文件中所有被export出来的函数,就是对外部的接口,即外部模块可以通过调用这些模块改变’对象‘的状态。对象就是ranch_server进程。对象的指针就是每个函数的第一个参数。
举个例子 函数`set_port(Ref, Port)'修改Ref指向对象的port。

用户的代码是从 ranch:start_listener开始

用户需要指定网络参数

{ok, _} = ranch:start_listener(tcp_echo, 1,
        ranch_tcp, [{port, 5555}], echo_protocol, []),
注意:整个ranch对外面的接口全部在ranch.erl中。这里通过调用ranch:start_listener启动监听,同时提供一个数据到达后的回调函数。
 tcp_echo:标志ranch之上的应用。所有的操作都要把tcp_echo带上。
 ranch_tcp:ranch提供tcp和ssl两种协议。实现多态.
 [{port, 5555}]:tcp的参数。
 echo_protocol:回调函数。

ranch启动服务

Res = supervisor:start_child(ranch_sup, child_spec(Ref, NbAcceptors,
         Transport, TransOpts, Protocol, ProtoOpts)),
注意:此时代码还是运行在tcp_echo的进程中。
 ranch:start_listener在ranch_sup的监督树中添加一个ranch_listener_sup的监督进程。进程的描述由函数 `child_spec` 产生。
{{ranch_listener_sup, Ref}, {ranch_listener_sup, start_link, [
        Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts
    ]}, permanent, 5000, supervisor, [ranch_listener_sup]}.
到这里tcp_echo_app返回。下面的工作交给ranch内部的相关模块。

ranch_listener_sup 监督者

这个模块是ranch中最重要的模块了。acceptors 池的管理,conns的管理都是该模块负责.

image.png

ranch_listener_sup 会启动两个监督者:ranch_conns_sup 和 ranch_acceptors_sup。
init({Ref, NbAcceptors, Transport, TransOpts, Protocol}) ->
    ChildSpecs = [
        %% conns_sup
        {ranch_conns_sup, {ranch_conns_sup, start_link,
                [Ref, Transport, Protocol]},
            permanent, infinity, supervisor, [ranch_conns_sup]},
        %% acceptors_sup
        {ranch_acceptors_sup, {ranch_acceptors_sup, start_link,
                [Ref, NbAcceptors, Transport, TransOpts]
            }, permanent, infinity, supervisor, [ranch_acceptors_sup]}
    ],
    {ok, {{rest_for_one, 10, 10}, ChildSpecs}}.
注意:一定要把ranch_conns_sup放在前面。监督者行为模式会根据顺序启动.

ranch_conns_sup监督者

起的是监督者的作用,监控处理连接的进程 (回调函数运行的进程)。但并没有实现 supervisor 模式,因为这个监督者做的事情并不是简单的启动子进程。还要做检查子进程的状态,处理最大连接数等。
start_link(Ref, Transport, Protocol) ->
    proc_lib:start_link(?MODULE, init, [self(), Ref, Transport, Protocol]).
启动一个子进程,执行init函数。init函数初始化loop的状态。
%% CurConns -> 当前连接数目
     %% MaxConns -> 最大连接数
     %% conns sup 进程一直循环使用 receive 接收消息
     loop(State=#state{parent=Parent, ref=Ref,
                transport=Transport, protocol=Protocol, opts=Opts,
        max_conns=MaxConns}, CurConns, NbChildren, Sleepers) ->
        receive
    %% ranch acceptor 获取 client socket 后,发送消息给 conns sup 进程
    %% 此时 client socket 控制进程为 conns sup
        {?MODULE, start_protocol, To, Socket} ->
                     %% 启动新进程,子进程就是最开始的地方使用者指定的回调 echo_protocol:start_link
            case Protocol:start_link(Ref, Socket, Transport, Opts) of 
                {ok, Pid} ->
                             %% 把socket的控制权从当前进程转给echo_protocol子进程,只有这样它才能收到网络数据。
                    %% 注意:这个时候子进程是阻塞住的,这是必须的!因为这个时候子进程还没有控制权,收不到数据。
                    Transport:controlling_process(Socket, Pid),
                    %% 给子进程发送消息, 唤醒子进程。
                    Pid ! {shoot, Ref}, 
                    put(Pid, true),
                    CurConns2 = CurConns + 1,
                    %% 如果还没有到达最大的连接数目
                    if CurConns2 < MaxConns ->
                                     %% 给 acceptor 进程发送消息,此时acceptor进程阻塞在start_protocol函数的receive
                            To ! self(),  
                            loop(State, CurConns2, NbChildren + 1,
                                Sleepers);
                        true ->
                        %% 如果大于最大连接数,当前 acceptor 进入休眠队列(acceptor进程一直在等待消息)
                        %% 注意:Sleeper多了一个acceptor进程。Erlang就是通过不停的更改参数,来改变进程的状态。
                            loop(State, CurConns2, NbChildren + 1,
                                [To|Sleepers])
                    end;
            end;
        %% echo_protocol子进程exit时,发送 EXIT 新号给连接的进程,也就是 conns sup
        %% 通过设置trap_exit,来捕获退出消息。
        %% 从Sleeper的头部取出一个acceptor,给它发送消息,让它唤醒,继续接受新的连接。
        {'EXIT', Pid, _} ->
            erase(Pid),
            [To|Sleepers2] = Sleepers,
            To ! self(),
            loop(State, CurConns - 1, NbChildren - 1, Sleepers2);
    end.
一个简化版的loop函数。列出了两个最重要的消息类型。
 1) start_protocol消息。这个消息的发送由调用本模块的start_protocol函数产生。在ranch_acceptor模块中,当有一个新的连接进来后调用该函数.
start_protocol(SupPid, Socket) ->
            SupPid ! {?MODULE, start_protocol, self(), Socket},
            %% 这个地方ranch_acceptor会同步的等在这个地方 !!!
            receive SupPid -> ok end.
这个消息会使ranch_conns_sup启动一个子进程 ‘Protocol:start_link(Ref, Socket, Transport, Opts)’。Protocol就是使用者最初提供的一个回调。
 2) ‘EXIT’消息, 通过设置 process_flag(trap_exit, true) 可以捕获到子进程的退出消息。并且唤醒一个阻塞的acceptor进程。

ranch_acceptors_sup监督者

执行accept进程的监督者,accept进程的个数是由NbAcceptors控制。
首先使用模块Transport:listen监听端口。
注意:Transport是一个变量,也就是说Erlang可以通过这种方式实现多态。
然后把conns_sup监督者的pid和listen返回的socket传给每一个acceptor子进程。
%% 参数:(tcp_echo, 1, ranch_tcp, [{port, 5555}])
    init([Ref, NbAcceptors, Transport, TransOpts]) ->
    {ok, Socket} = Transport:listen(TransOpts),
    %% acceptor 池配置的多大,就有多少 ranch_acceptor
    Procs = [
        {{acceptor, self(), N}, {ranch_acceptor, start_link, [
            LSocket, Transport, ConnsSup
        ]}, permanent, brutal_kill, worker, []}
            || N <- lists:seq(1, NbAcceptors)],
    {ok, {{one_for_one, 10, 10}, Procs}}.
    ```
#### ranch_acceptor进程
     ranch_acceptor的个数和Nbacceptors一样。
     Erlang中可以多个进程同时accept一个socket。
     ```
     loop(LSocket, Transport, ConnsSup) ->
    _ = case Transport:accept(LSocket, infinity) of
        {ok, CSocket} ->
                %% 获取的连接的socket控制权转交给conns_sup进程,conns_sup再把控制权转交给它的子进程。
            Transport:controlling_process(CSocket, ConnsSup),
            %% 同步的调用,需要等待 conns sup 进程 生成echo_protocol子进程后,返回 self() 为止。
            ranch_conns_sup:start_protocol(ConnsSup, CSocket);
        {error, Reason} when Reason =/= closed ->
            ok
    end,
    ?MODULE:loop(LSocket, Transport, ConnsSup)

比较一下ranch和anet

1) ranch和anet都是高并发的网络库; ranch可以处理海量链接, anet可以处理高吞吐量, 低延迟;

2) 使用的时候都要提供一个回调函数;

3) anet封装了packet, 而ranch提供的还是流服务. 这是应用场景决定的: anet面向的内网, 内网一般是基于特定的业务, 而业务的数据是有结构的. ranch既可以面向内网,也可以面向外网. 而有的应用场景本身就是流式的, 比如http协议. ranch可以不需要等待一个http请求头部到达后再处理, 可以一边接收http请求头一遍解析http的状态机. 当然, 可以等到全部的http请求头到达后在一次性解析. 但是这样处理对于Erlang来说效率是慢的. 对于anet来说效率是快的. 使用者可以在回调中实现自己的协议, 使得ranch支持有结构的数据包.

4) anet中的超时是使用了一个timeout 线程, 每隔一段时间扫面一遍所有的链接最后一次收到数据的时间. 而ranch的超时可以在回调中receive设置超时,很自然的支持.

5) anet支持同一个链接复用不用的请求,通过channelID实现. client端每次发送包的时候都要动态生成一个包的channelID, 并且把<channelID, handler>插入到hashtable中. 发送给server的数据包头部带上这个channeldID. server端返回的时候把channelID 原值返回给client端. client端通过channelID 找到对应的handler. 而Erlang中处理并发的思路就是直接一个进程. 一个请求对应一个进程.

相关文章
|
5月前
|
C++
基于Reactor模型的高性能网络库之地址篇
这段代码定义了一个 InetAddress 类,是 C++ 网络编程中用于封装 IPv4 地址和端口的常见做法。该类的主要作用是方便地表示和操作一个网络地址(IP + 端口)
287 58
|
5月前
|
网络协议 算法 Java
基于Reactor模型的高性能网络库之Tcpserver组件-上层调度器
TcpServer 是一个用于管理 TCP 连接的类,包含成员变量如事件循环(EventLoop)、连接池(ConnectionMap)和回调函数等。其主要功能包括监听新连接、设置线程池、启动服务器及处理连接事件。通过 Acceptor 接收新连接,并使用轮询算法将连接分配给子事件循环(subloop)进行读写操作。调用链从 start() 开始,经由线程池启动和 Acceptor 监听,最终由 TcpConnection 管理具体连接的事件处理。
185 2
|
5月前
基于Reactor模型的高性能网络库之Tcpconnection组件
TcpConnection 由 subLoop 管理 connfd,负责处理具体连接。它封装了连接套接字,通过 Channel 监听可读、可写、关闭、错误等
159 1
|
5月前
|
负载均衡 算法 安全
基于Reactor模式的高性能网络库之线程池组件设计篇
EventLoopThreadPool 是 Reactor 模式中实现“一个主线程 + 多个工作线程”的关键组件,用于高效管理多个 EventLoop 并在多核 CPU 上分担高并发 I/O 压力。通过封装 Thread 类和 EventLoopThread,实现线程创建、管理和事件循环的调度,形成线程池结构。每个 EventLoopThread 管理一个子线程与对应的 EventLoop(subloop),主线程(base loop)通过负载均衡算法将任务派发至各 subloop,从而提升系统性能与并发处理能力。
279 3
|
5月前
基于Reactor模式的高性能网络库github地址
https://github.com/zyi30/reactor-net.git
126 0
|
5月前
基于Reactor模型的高性能网络库之Poller(EpollPoller)组件
封装底层 I/O 多路复用机制(如 epoll)的抽象类 Poller,提供统一接口支持多种实现。Poller 是一个抽象基类,定义了 Channel 管理、事件收集等核心功能,并与 EventLoop 绑定。其子类 EPollPoller 实现了基于 epoll 的具体操作,包括事件等待、Channel 更新和删除等。通过工厂方法可创建默认的 Poller 实例,实现多态调用。
297 60
|
5月前
|
安全 调度
基于Reactor模型的高性能网络库之核心调度器:EventLoop组件
它负责:监听事件(如 I/O 可读写、定时器)、分发事件、执行回调、管理事件源 Channel 等。
291 57
|
5月前
基于Reactor模型的高性能网络库之时间篇
是一个用于表示时间戳(精确到微秒)**的简单封装类
204 57
|
5月前
|
缓存 索引
基于Reactor模式的高性能网络库之缓冲区Buffer组件
Buffer 类用于处理 Socket I/O 缓存,负责数据读取、写入及内存管理。通过预分配空间和索引优化,减少内存拷贝与系统调用,提高网络通信效率,适用于 Reactor 模型中的异步非阻塞 IO 处理。
190 3
|
4月前
|
机器学习/深度学习 算法 数据库
基于GoogleNet深度学习网络和GEI步态能量提取的步态识别算法matlab仿真,数据库采用CASIA库
本项目基于GoogleNet深度学习网络与GEI步态能量图提取技术,实现高精度步态识别。采用CASI库训练模型,结合Inception模块多尺度特征提取与GEI图像能量整合,提升识别稳定性与准确率,适用于智能安防、身份验证等领域。