[Erlang 0089] RabbitMQ Exchange

简介:

 之前提到了RabbitMQ是怎样维护Queue的data和metadata的.我们知道Queue在RabbitMQ对应Erlang的进程,那么Exchane是不是也是独立的Erlang进程呢?它的信息是如何维护的呢?

 

Exchange 本质上是什么

 

   印象中Vhost就像一个容器,Exchange Queue就像一个个零件,这些东西组装起来成为一个消息的Broker.真正实现的时候并不是每一个东西都会按照一一对应的方式设计实体,这里我想到两点:

  [1] 思考问题的时候恰当的隐喻能够帮我们快速理解问题,但是在某个时候要抛开隐喻,因为它和你要理解的东西毕竟是两个独立的东西;

  [2] 奥卡姆剃刀原则,如无必要勿增实体,在Erlang世界中创建进程的成本很低,往往会滥用这一点,把一些没有必要设计成独立进程的搞成了进程,Erlang的设计哲学是把独立的活动用进程表达.进程创建也是有限制的,最后一根稻草压死骆驼,雪崩的时候每一片雪花都不认为自己有责任,就是这个道理.

 

   Exchange在RabbitMQ就没有设计为进程而仅仅是单纯维护了exchange 名称以及相关的bingding规则.我们发送消息到Exchange本质上是:你连接到RabbitMQ的channel拿消息中的routing key和binding规则匹配完成消息的路由.换句话说,真正完成route逻辑的实体是channel,exchange只是一堆规则集,很容易在集群内部同步,所以没有queue面临的那些问题.下面是rabbit_channel的代码片段,我们可以窥见一斑:

  

复制代码
handle_method(#'basic.publish'{exchange    = ExchangeNameBin,
                               routing_key = RoutingKey,
                               mandatory   = Mandatory,
                               immediate   = Immediate},
              Content, State = #ch{virtual_host    = VHostPath,
                                   tx_status       = TxStatus,
                                   confirm_enabled = ConfirmEnabled,
                                   trace_state     = TraceState}) ->
    ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
    check_write_permitted(ExchangeName, State),
    Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
    check_internal_exchange(Exchange),
    %% We decode the content's properties here because we're almost
    %% certain to want to look at delivery-mode and priority.
    DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
    check_user_id_header(DecodedContent#content.properties, State),
    {MsgSeqNo, State1} =
        case {TxStatus, ConfirmEnabled} of
            {none, false} -> {undefined, State};
            {_, _}        -> SeqNo = State#ch.publish_seqno,
                             {SeqNo, State#ch{publish_seqno = SeqNo + 1}}
        end,
    case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
        {ok, Message} ->
            rabbit_trace:tap_trace_in(Message, TraceState),
            Delivery = rabbit_basic:delivery(Mandatory, Immediate, Message,
                                             MsgSeqNo),
            QNames = rabbit_exchange:route(Exchange, Delivery),
            {noreply,
             case TxStatus of
                 none        -> deliver_to_queues({Delivery, QNames}, State1);
                 in_progress -> TMQ = State1#ch.uncommitted_message_q,
                                NewTMQ = queue:in({Delivery, QNames}, TMQ),
                                State1#ch{uncommitted_message_q = NewTMQ}
             end};
        {error, Reason} ->
            rabbit_misc:protocol_error(precondition_failed,
                                       "invalid message: ~p", [Reason])
    end;
复制代码

 

  我们在RabbitMQ集群环境创建exchange实际上就是在集群内所有节点的exchange表增加一条数据.这样连接到所有节点都可以使用这个新exchange.

 

  要是发送到channel的消息还没有完成路由节点就当掉了怎么办?

 

  basic.public AMQP命令并没有返回消息的状态.这就意味节点当掉的时候,channel还会继续routing消息.producer会继续发布消息,这样就有丢消息的风险.

 

  解决方案有两个:[1] 使用AMQP transaction (AMQP事务):producer保持阻塞状态直到消息被路由到queue.[2] 使用publisher confirm 来跟踪节点当掉的时候哪些消息还没有确认.

  这两种方法都可以帮助我们检测消息是否到达目的地(比如节点当掉).

  下面我们看看exchange在erlang node里面是怎么维护的:

 

Exchange in Erlang node

 
  通过ets:i()查看节点的ETS表,能够看到一系列Rabbit前缀的表:
 
复制代码
rabbit_durable_exchange rabbit_durable_exchange set   9      507      mnesia_monitor
rabbit_durable_queue rabbit_durable_queue set   2      335      mnesia_monitor
rabbit_durable_route rabbit_durable_route set   4      445      mnesia_monitor
rabbit_exchange rabbit_exchange   set   9      507      mnesia_monitor
rabbit_exchange_serial rabbit_exchange_serial set   0      283      mnesia_monitor
rabbit_listener rabbit_listener   bag   1      321      mnesia_monitor
rabbit_queue    rabbit_queue      set   2      335      mnesia_monitor
rabbit_registry rabbit_registry   set   7      353      rabbit_registry
rabbit_reverse_route rabbit_reverse_route ordered_set 4      239      mnesia_monitor
rabbit_route    rabbit_route      ordered_set 4      239      mnesia_monitor
rabbit_semi_durable_route rabbit_semi_durable_route ordered_set 4      239      mnesia_monitor
rabbit_topic_trie_binding rabbit_topic_trie_binding ordered_set 0      73       mnesia_monitor
rabbit_topic_trie_edge rabbit_topic_trie_edge ordered_set 0      73       mnesia_monitor
rabbit_topic_trie_node rabbit_topic_trie_node ordered_set 0      73       mnesia_monitor
rabbit_user     rabbit_user       set   1      302      mnesia_monitor
rabbit_user_permission rabbit_user_permission set   1      315      mnesia_monitor
rabbit_vhost    rabbit_vhost      set   1      294      mnesia_monitor
复制代码

 

 
  下面我们看几个我们比较关注的ets表
 
  rabbit_exchange :
 
复制代码
(default@zen.com)37> ets:match(rabbit_exchange,'$1').
[[{exchange,{resource,<<"/">>,exchange,
                      <<"amq.rabbitmq.trace">>},
            topic,true,false,false,[],undefined}],
[{exchange,{resource,<<"/">>,exchange,
                      <<"amq.rabbitmq.log">>},
            topic,true,false,false,[],undefined}],
[{exchange,{resource,<<"/">>,exchange,<<"qp_pic_exchange">>},
            direct,true,false,false,[],undefined}],
[{exchange,{resource,<<"/">>,exchange,<<"amq.match">>},
            headers,true,false,false,[],undefined}],
[{exchange,{resource,<<"/">>,exchange,<<"amq.headers">>},
            headers,true,false,false,[],undefined}],
[{exchange,{resource,<<"/">>,exchange,<<"amq.topic">>},
            topic,true,false,false,[],undefined}],
[{exchange,{resource,<<"/">>,exchange,<<"amq.direct">>},
            direct,true,false,false,[],undefined}],
[{exchange,{resource,<<"/">>,exchange,<<"amq.fanout">>},
            fanout,true,false,false,[],undefined}],
[{exchange,{resource,<<"/">>,exchange,<<>>},
            direct,true,false,false,[],undefined}]]
复制代码

 

 
rabbit_queue
 
复制代码
(default@zen.com)32> ets:match(rabbit_queue,'$1').
[[#amqqueue{name = {resource,<<"/">>,queue,
                             <<"zen_qp_pic_queue">>},
            durable = true,auto_delete = false,exclusive_owner = none,
            arguments = [],pid = <0.396.0>,slave_pids = [],
            mirror_nodes = undefined}],
[#amqqueue{name = {resource,<<"/">>,queue,
                             <<"qp_pic_queue2">>},
            durable = true,auto_delete = false,exclusive_owner = none,
            arguments = [],pid = <0.397.0>,slave_pids = [],
            mirror_nodes = undefined}]]
(default@zen.com)33> process_info(pid(0,396,0)).
[{current_function,{erlang,hibernate,3}},
{initial_call,{proc_lib,init_p,5}},
{status,waiting},
{message_queue_len,0},
{messages,[]},
{links,[#Port<0.2723>,<0.204.0>]},
{dictionary,[{{"/var/lib/rabbitmq/mnesia/rabbit@localhost/queues/48ROU6Q8HVQVRA1PVZQ38TZLK/journal.jif",
                fhc_file},
               {file,1,true}},
              {{xtype_to_module,direct},rabbit_exchange_type_direct},
              {credit_blocked,[]},
              {{credit_from,<0.198.0>},1998},
              {'$ancestors',[rabbit_amqqueue_sup,rabbit_sup,<0.110.0>]},
              {{#Ref<0.0.0.4414>,fhc_handle},
               {handle,{file_descriptor,prim_file,{#Port<0.2723>,18}},
                       0,false,0,infinity,[],true,
                       "/var/lib/rabbitmq/mnesia/rabbit@localhost/queues/48ROU6Q8HVQVRA1PVZQ38TZLK/journal.jif",
                       [write,binary|...],
                       [{...}],
                       true,...}},
              {fhc_age_tree,{1,
                             {{1352,908329,782846},#Ref<0.0.0.4414>,nil,nil}}},
              {guid,{{2942931266,2608231844,2036421255,3742291922},1}},
              {'$initial_call',{gen,init_it,6}}]},
{trap_exit,true},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.109.0>},
{total_heap_size,1709},
{heap_size,1709},
{stack_size,0},
{reductions,8669},
{garbage_collection,[{min_bin_vheap_size,46368},
                      {min_heap_size,233},
                      {fullsweep_after,65535},
                      {minor_gcs,10}]},
{suspending,[]}]
复制代码

 

 rabbit_route

复制代码
(default@zen.com)35> ets:match(rabbit_route,'$1').
[[{route,{binding,{resource,<<"/">>,exchange,<<>>},
                  <<"qp_pic_queue2">>,
                  {resource,<<"/">>,queue,<<"qp_pic_queue2">>},
                  []},
         const}],
[{route,{binding,{resource,<<"/">>,exchange,<<>>},
                  <<"zen_qp_pic_queue">>,
                  {resource,<<"/">>,queue,<<"zen_qp_pic_queue">>},
                  []},
         const}],
[{route,{binding,{resource,<<"/">>,exchange,
                            <<"qp_pic_exchange">>},
                  <<"qp_pic2">>,
                  {resource,<<"/">>,queue,<<"qp_pic_queue2">>},
                  []},
         const}],
[{route,{binding,{resource,<<"/">>,exchange,
                            <<"qp_pic_exchange">>},
                  <<"qp_pic2">>,
                  {resource,<<"/">>,queue,<<"zen_qp_pic_queue">>},
                  []},
         const}]]
复制代码

 

 

technology.rabbitmq.ebook
sport.golf.paper
sport.tennis.ebook

 

Using the topic exchange, the subscription/binding key specified in step 5 can be a sequence 

of dot-separated words and/or wildcards. AMQP wildcards are just:
 #  This matches zero or more words
 *  This matches exactly one word


So, for example:
 #.ebook and *.*.ebook both match the first and the third sent messages
 sport.# and sport.*.* both match the second and the third sent messages
 # alone matches any message sent

 

 

最后小图一张 Emma Watson 我们俩都是O型血 Petrificus Totalus!

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
9月前
|
消息中间件 存储 Java
RabbitMQ之Direct(直连)Exchange解读
RabbitMQ之Direct(直连)Exchange解读
|
28天前
|
消息中间件
Rabbitmq与Erlang对应版本关系
Rabbitmq与Erlang对应版本关系
20 0
|
2月前
|
消息中间件 Shell
rabbitmq安装erlang环境后没生效
rabbitmq安装erlang环境后没生效
979 7
|
2月前
|
消息中间件 Java Kafka
RabbitMQ安装和5种不同的消息模型(BasicQueue,WorkQueue,Fanout Exchange,Direct Exchange,Topic Exchange)与SpringAMQP
RabbitMQ安装和5种不同的消息模型(BasicQueue,WorkQueue,Fanout Exchange,Direct Exchange,Topic Exchange)与SpringAMQP
|
2月前
|
消息中间件 Java
RabbitMQ中的Exchange是什么?它有哪些类型?
RabbitMQ中的Exchange是什么?它有哪些类型?
51 0
|
9月前
|
消息中间件 存储 Java
RabbitMQ之Exchange(交换机)属性及备用交换机解读
RabbitMQ之Exchange(交换机)属性及备用交换机解读
|
9月前
|
消息中间件 存储 Java
RabbitMQ之Fanout(扇形) Exchange解读
RabbitMQ之Fanout(扇形) Exchange解读
|
消息中间件 存储
RabbitMQ从入门到进阶(Direct exchange)
RabbitMQ从入门到进阶(Direct exchange)
158 0
|
消息中间件 存储
RabbitMQ从入门到进阶(Exchange)
RabbitMQ从入门到进阶(Exchange)
130 0
|
消息中间件 网络架构
9、RabbitMQ教程-Topic Exchange类型的基本使用demo
9、RabbitMQ教程-Topic Exchange类型的基本使用demo
119 0
9、RabbitMQ教程-Topic Exchange类型的基本使用demo