[Erlang 0012]Erlang Process input queue

简介:

    Erlang进程有自己的消息队列来保存接收到的消息,新接收到的消息放在队列的尾部。Erlang的接收原语receive就是用来从消息队列中选择性提取消息的。receive提取消息的过程是:从消息队列的第一条消息开始匹配,如果有一条消息可以匹配上就从消息队列中移除,并执行相应的消息处理逻辑。如果没有模式可以匹配消息队列中的消息,这条消息就会保留在消息队列中。如果检查到消息队列中的最后一条消息还没有找到可以匹配的消息,进程就会阻塞直到接收到下一条消息再一次出发提取过程。

    我们能不能直观的看到这个过程呢?Erlang对运行时信息的提取提供了很好的支持,我们要查看的是一个进程在运行时的信息,使用的方法:erlang:process_info/1 .这个方法接收的参数就是进程的PID,返回的数据结果:

[{current_function,{looper,loop,0}},
{initial_call,{looper,loop,0}},
{status,waiting},
{message_queue_len,4},
{messages,[abcd,abcde,abcdef,abcdefg]},
{links,[]},{dictionary,[]},
{trap_exit,false},{error_handler,error_handler},
{priority,normal},{group_leader,<0.29.0>},
{total_heap_size,233},{heap_size,233},
{stack_size,1},{reductions,41},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},{fullsweep_after,65535},
{minor_gcs,0}]},
{suspending,[]}]

    上面erlang:process_info(Pid).的执行结果中,红色标出就是消息队列中的消息数量,蓝色标出的就是消息队列的内容。下面我们就通过一系列的Demo来理解Erlang进程消息队列的处理机制.

 

 消息队列堆积的情况

      我们先把进行测试的脚手架代码准备好,逻辑很简单如果接收到abc消息就输出一下然后继续接收消息,这通过尾递归loop方法就可以实现。

-module(looper).

-compile(export_all).

loop() ->
     receive 
        abc -> 
                  io:format("Receive abc. ~n "),
                  loop();
        stop-> 
                  io:format("stop"),
                  stop
      end.    

 

    模拟消息堆积的方法很简单,我们不停向这个进程发送无法匹配的消息就可以了,然后我们查看进程的运行时状态,下面是shell中执行的结果,大家看注释:

(demo@192.168.1.123)1> Pid= spawn(looper,loop,[]).    %%启动进程返回进程PID
<0.38.0>
(demo@192.168.1.123)2> Pid!abc.                              %%向进程发送abc消息
Receive abc.                                                              %% abc消息正常处理
abc
(demo@192.168.1.123)4> Pid!abcd.                            %%向进程发送消息abcd
abcd
(demo@192.168.1.123)5> Pid!abcde.                            %%进程发送消息abcde
abcde
(demo@192.168.1.123)6> Pid!abcdef.
abcdef
(demo@192.168.1.123)7> Pid!abcdefg.
abcdefg
(demo@192.168.1.123)9> erlang:process_info(Pid).      %%查看进程状态
[{current_function,{looper,loop,0}},
{initial_call,{looper,loop,0}},
{status,waiting},
{message_queue_len,4},
{messages,[abcd,abcde,abcdef,abcdefg]},              %%这里能看到积压的消息
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,233},
{heap_size,233},
{stack_size,1},
{reductions,41},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,0}]},
{suspending,[]}]
(demo@192.168.1.123)10> Pid!haha.            %%再发送一条垃圾消息haha
haha
(demo@192.168.1.123)11> erlang:process_info(Pid).
[{current_function,{looper,loop,0}},
{initial_call,{looper,loop,0}},
{status,waiting},
{message_queue_len,5},
{messages,[abcd,abcde,abcdef,abcdefg,haha]},   %%可以看到haha消息被放在了消息队列的队尾
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,233},
{heap_size,233},
{stack_size,1},
{reductions,41},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,0}]},
{suspending,[]}]
(demo@192.168.1.123)12>

 

按照优先级接收消息

 

下面的代码范例来自LYSE,可以看到首先是处理高优先级的消息,如果高优先级的消息处理完毕之后,处理低优先级的消息.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
important() ->
     receive
         {Priority, Message} when Priority > 10 ->
             [Message | important()]
     after 0 ->
         normal()
     end.
  
normal() ->
     receive
         {_, Message} ->
             [Message | normal()]
     after 0 ->
         []
     end.

  

 


  定时清理堆积的垃圾消息

   堆积的垃圾消息会慢慢吃掉内存,而且堆积的消息在Selective Receive过程中会不断地被遍历检查,成为负担,我们现在就添加一个定时清理堆积消息的逻辑:

-module(looper).

-compile(export_all).

      
%% P= spawn(looper,loop2,[]).      
%% erlang:process_info(P).
%%
loop2() -> 
     receive 
             abc -> 
                  io:format("Receive abc. ~n "),
                  loop2();
        stop-> 
                  io:format("stop"),
                  stop
         after 15000 -> 
             receive 
                      Any ->
                               io:format("Receive ~p ~n ",[ Any])
              end,         
        
                    io:format("clear . ~n "),
                    loop2()
      end. 

  做法也很简单,添加一个超时,超时之后用一个可以接收任意消息(Any)的receive代码段来从消息队列中提取一条消息.为了留出足够的时间来输入命令,我们把超时时间定为15000(15s).好了,启动shell重新来过,大家还是看我添加的注释:

D:\Erlang\Einaction>"C:\Program Files (x86)\erl5.8.2\bin\erl.exe" -name demo@192.168.1.123 -setcookie 123
Eshell V5.8.2 (abort with ^G)
(demo@192.168.1.123)1> P= spawn(looper,loop2,[]).  %使用loop2创建进程
<0.38.0>
(demo@192.168.1.123)2> P!abcd.                    %快速输入下面的命令向进程发送无法匹配的垃圾进程
abcd
(demo@192.168.1.123)3> P!abcde.
abcde
(demo@192.168.1.123)4> P!abcdef.
abcdef
(demo@192.168.1.123)5> P!abcdefg.
abcdefg
(demo@192.168.1.123)6> P!abcdefgg.
abcdefgg
(demo@192.168.1.123)7> erlang:process_info(P).Receive abcd      %我们输入完erlang:process_info(P).的时候恰好遇到了清理逻辑执行
(demo@192.168.1.123)7clear .
(demo@192.168.1.123)7>
[{current_function,{looper,loop2,0}},
{initial_call,{looper,loop2,0}},
{status,waiting},
{message_queue_len,4},
{messages,[abcde,abcdef,abcdefg,abcdefgg]}, %%除了已经被移除掉的abcd 其它垃圾消息还堆积在进程中
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,233},
{heap_size,233},
{stack_size,1},
{reductions,39},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,0}]},
{suspending,[]}]
(demo@192.168.1.123)8Receive abcde
(demo@192.168.1.123)8clear .
(demo@192.168.1.123)8> erlang:process_info(P). %上面又执行了一次清理逻辑,我们再次查看进程信息
[{current_function,{looper,loop2,0}},
{initial_call,{looper,loop2,0}},
{status,waiting},
{message_queue_len,3},
{messages,[abcdef,abcdefg,abcdefgg]},  %%看到了吧,又少了一条消息垃圾消息abcde
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,233},
{heap_size,233},
{stack_size,1},
{reductions,69},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,0}]},
{suspending,[]}]
(demo@192.168.1.123)9> Receive abcdef
(demo@192.168.1.123)9> clear .
(demo@192.168.1.123)9> erlang:process_info(P).
[{current_function,{looper,loop2,0}},
{initial_call,{looper,loop2,0}},
{status,waiting},
{message_queue_len,2},
{messages,[abcdefg,abcdefgg]},
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,233},
{heap_size,233},
{stack_size,1},
{reductions,99},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,0}]},
{suspending,[]}]
(demo@192.168.1.123)10> P!abc.
Receive abc.
abc
(demo@192.168.1.123)11> erlang:process_info(P).
[{current_function,{looper,loop2,0}},
{initial_call,{looper,loop2,0}},
{status,waiting},
{message_queue_len,2},
{messages,[abcdefg,abcdefgg]},
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,233},
{heap_size,233},
{stack_size,1},
{reductions,115},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,0}]},
{suspending,[]}]
(demo@192.168.1.123)12> Receive abcdefg
(demo@192.168.1.123)12> clear .
(demo@192.168.1.123)12> Receive abcdefgg
(demo@192.168.1.123)12> clear .
(demo@192.168.1.123)12> erlang:process_info(P).
[{current_function,{looper,loop2,0}},
{initial_call,{looper,loop2,0}},
{status,waiting},
{message_queue_len,0},
{messages,[]},                   %好了,执行到这里消息队列已经清空
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,233},
{heap_size,233},
{stack_size,1},
{reductions,175},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,0}]},
{suspending,[]}]
(demo@192.168.1.123)13>

 

 从消息队列中提取消息的慢镜头

  receive原语执行过程中,遇到匹配的消息,提取消息进行处理的过程稍纵即逝,我们现在通过添加sleep,来看看这个过程的慢镜头.注意下面代码的修改

-module(looper).

-compile(export_all).

     

%% P= spawn(looper,loop3,[]).      
%% erlang:process_info(P).
%%      
loop3() -> 
     receive 
             abc -> 
                  io:format("Receive abc. ~n "),
                  timer:sleep(10000),
                  io:format("sleep after receive abc done. ~n "),
                  loop3();
        stop-> 
                  io:format("stop"),
                  stop
         after 25000 -> 
             receive 
                      Any ->
                               io:format("Receive ~p ~n ",[ Any])
              end,                 
                    io:format("clear . ~n "),
                    loop3()
      end.                  

下面的shell中,我们向进程发送了一批可以正常处理的abc消息,但是由于处理逻辑中的sleep,消息提取会被拖慢,这个时间我们可以执行process_info

 

D:\Erlang\Einaction>"C:\Program Files (x86)\erl5.8.2\bin\erl.exe" -name demo@192.168.1.123 -setcookie 123
Eshell V5.8.2 (abort with ^G)
(demo@192.168.1.123)1> P= spawn(looper,loop3,[]).
<0.38.0>
(demo@192.168.1.123)2> erlang:process_info(P).
[{current_function,{looper,loop3,0}},
{initial_call,{looper,loop3,0}},
{status,waiting},
{message_queue_len,0},
{messages,[]},
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,233},
{heap_size,233},
{stack_size,1},
{reductions,9},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,0}]},
{suspending,[]}]
(demo@192.168.1.123)3> P!abc.  
Receive abc.
abc
(demo@192.168.1.123)4> P!abc.
abc
(demo@192.168.1.123)5> P!abc.
abc
(demo@192.168.1.123)6> P!abc.
abc
(demo@192.168.1.123)7> P!abc.
abc
(demo@192.168.1.123)8> P!abc.
abc
(demo@192.168.1.123)9> P!abc.
abc
(demo@192.168.1.123)10> P!abc.
abc
(demo@192.168.1.123)11> P!abcd.
abcd
(demo@192.168.1.123)12> P!abcdd.
abcdd
(demo@192.168.1.123)13> erlang:process_info(P).sleep after receive abc done.
(demo@192.168.1.123)13> Receive abc.
(demo@192.168.1.123)13>
[{current_function,{timer,sleep,1}},
{initial_call,{looper,loop3,0}},
{status,waiting},
{message_queue_len,8},
{messages,[abc,abc,abc,abc,abc,abc,abcd,abcdd]},
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,233},
{heap_size,233},
{stack_size,3},
{reductions,66},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,0}]},
{suspending,[]}]
(demo@192.168.1.123)14> erlang:process_info(P).
[{current_function,{timer,sleep,1}},
{initial_call,{looper,loop3,0}},
{status,waiting},
{message_queue_len,8},
{messages,[abc,abc,abc,abc,abc,abc,abcd,abcdd]},
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,233},
{heap_size,233},
{stack_size,3},
{reductions,66},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,0}]},
{suspending,[]}]
(demo@192.168.1.123)15> sleep after receive abc done.
(demo@192.168.1.123)15> Receive abc.
(demo@192.168.1.123)15> sleep after receive abc done.
(demo@192.168.1.123)15> Receive abc.
(demo@192.168.1.123)15> erlang:process_info(P).
[{current_function,{timer,sleep,1}},
{initial_call,{looper,loop3,0}},
{status,waiting},
{message_queue_len,6},
{messages,[abc,abc,abc,abc,abcd,abcdd]},
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,233},
{heap_size,233},
{stack_size,3},
{reductions,131},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,0}]},
{suspending,[]}]
(demo@192.168.1.123)16> sleep after receive abc done.
(demo@192.168.1.123)16> Receive abc.
(demo@192.168.1.123)16> sleep after receive abc done.
(demo@192.168.1.123)16> Receive abc.
(demo@192.168.1.123)16> sleep after receive abc done.
(demo@192.168.1.123)16> Receive abc.
(demo@192.168.1.123)16> sleep after receive abc done.
(demo@192.168.1.123)16> Receive abc.
(demo@192.168.1.123)16> sleep after receive abc done.
(demo@192.168.1.123)16> Receive abcd
(demo@192.168.1.123)16> clear .
(demo@192.168.1.123)16> erlang:process_info(P).
[{current_function,{looper,loop3,0}},
{initial_call,{looper,loop3,0}},
{status,waiting},
{message_queue_len,1},
{messages,[abcdd]},  %执行到这里只有一条垃圾数据堆积了
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,610},
{heap_size,233},
{stack_size,1},
{reductions,305},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,2}]},
{suspending,[]}]
(demo@192.168.1.123)17> Receive abcdd
(demo@192.168.1.123)17> clear .
(demo@192.168.1.123)17> erlang:process_info(P).
[{current_function,{looper,loop3,0}},
{initial_call,{looper,loop3,0}},
{status,waiting},
{message_queue_len,0},
{messages,[]},  %%消息队列已经清空
{links,[]},
{dictionary,[]},
{trap_exit,false},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.29.0>},
{total_heap_size,610},
{heap_size,233},
{stack_size,1},
{reductions,335},
{garbage_collection,[{min_bin_vheap_size,46368},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,2}]},
{suspending,[]}]
(demo@192.168.1.123)18>

  日出而作子夜还,偶得周末半日闲,各位周末愉快!

 

2012-4-28 17:45补充

Process Data 
For each process there will be at least one =proc_stack and one =proc_heap tag followed by the raw memory information for the stack and heap of the process. 
For each process there will also be a =proc_messages tag if the process' message queue is non-empty and a =proc_dictionary tag if the process' dictionary (the put/2 and get/1 thing) is non-empty.

The raw memory information can be decoded by the Crashdump Viewer tool. You will then be able to see the stack dump, the message queue (if any) and the dictionary (if any). The stack dump is a dump of the Erlang process stack. Most of the live data (i.e., variables currently in use) are placed on the stack; thus this can be quite interesting. One has to "guess" what's what, but as the information is symbolic, thorough reading of this information can be very useful. As an example we can find the state variable of the Erlang primitive loader on line (5) in the example below:

(1) 3cac44 Return addr 0x13BF58 (<terminate process normally>) 
(2) y(0) 
["/view/siri_r10_dev/clearcase/otp/erts/lib/kernel/ebin","/view/siri_r10_dev/ 
(3) clearcase/otp/erts/lib/stdlib/ebin"] 
(4) y(1) <0.1.0> 
(5) y(2) 
{state,[],none,#Fun<erl_prim_loader.6.7085890>,undefined,#Fun<erl_prim_loader.7.900
0327>,#Fun<erl_prim_loader.8.116480692>,#Port<0.2>,infinity,#Fun<erl_prim_loader.9.
10708760>} 
(6) y(3) infinity 
When interpreting the data for a process, it is helpful to know that anonymous function objects (funs) are given a name constructed from the name of the function in which they are created, and a number (starting with 
0) indicating the number of that fun within that function.

 

2012-08-30 16:23 更新

 
      如果需要按照优先级收发消息,可以使用二叉堆(min_heap)或者gb_trees,接收到的消息填充到这样的结构里面(把优先级数值放在第一个key用来排序).使用的时候只需要检索最小或者最大值就可以了.大部分情况下这种方法都可以实现按优先级接受消息,但收到大量高优先级消息的情况就会变慢;从R14A开始,Erlang编译器有一个优化减少了消息接收时进程之间的反复通信,在消息通信之初会创建一个reference附加在往来的消息中,这样在reference创建成功之前自动过滤掉所有不包含这个Reference特征的消息.
 
复制代码
%% optimized in R14A
optimized(Pid) ->
    Ref = make_ref(),
    Pid ! {self(), Ref, hello},
    receive
        {Pid, Ref, Msg} ->
            io:format("~p~n", [Msg])
    end.
复制代码
目录
相关文章
|
消息中间件 Linux Docker
RabbitMQ: /var/lib/rabbitmq/.erlang.cookie must be accessible by owner only
RabbitMQ: /var/lib/rabbitmq/.erlang.cookie must be accessible by owner only
1885 0
RabbitMQ: /var/lib/rabbitmq/.erlang.cookie must be accessible by owner only
|
消息中间件 PHP
laravel6 使用rabbitmq报错:Call to a member function make() on null at Queue\\Jobs\\Job.php:215
laravel6 使用rabbitmq报错:Call to a member function make() on null at Queue\\Jobs\\Job.php:215
193 0
|
Shell 自然语言处理 网络协议
|
API 数据建模 计算机视觉