伍哥原创之rabbitmq在豆荚商城的应用

简介: 【伍哥原创】1,前言RabbitMQ 是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成,因此也是继承了这些优点。

【伍哥原创】

1,前言

RabbitMQ 是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成,因此也是继承了这些优点。

AMQP 里主要要说两个组件:Exchange 和 Queue (在 AMQP 1.0 里还会有变动),如下图所示,绿色的 X 就是 Exchange ,红色的是 Queue ,这两者都在 Server 端,又称作 Broker ,这部分是 RabbitMQ 实现的,而蓝色的则是客户端,通常有 Producer 和 Consumer 两种类型:

RabbitMQ作为一个非常成熟的消息队列技术方案,也应用到了豆荚商城项目里面。

2,邮件服务:将慢动作从请求中分离出来

作为一个商城,自然少不了给用户发送邮件。比如注册的时候要发送确认邮件,下单以后发送订单邮件,推广信息也需要发送邮件,类似的情况非常多。
实现邮件发送的通常做法比较简单,就是在HTTP请求中一并完成邮件发送这个动作。而发送邮件依赖于SMTP服务。在小并发的环境下,一切都工作的很正常。

但是,当并发请求上到一定的程度,问题就来了。HTTP必须等待SMTP这个慢动作,如果你需要带附件的话,情况就更糟糕了。
另外一个问题来自于SMTP,当请求过于频密的时候,SMTP就出现超负荷工作的情况,这样各种邮件发送的异常情况就在所难免了。

怎么才能很好的解决这个问题呢?
答案在前面就给出了,就是建立消息队列机制!
其实原理非常简单,就是在内存维护一个队列(queue),如果要发送一封邮件,就往队列里面写一条消息,也就是所谓的信息生产者。
再建立一个进程,处理队列里面的邮件发送,就是所谓的信息消费者。

由于商城是用PHP开发的,所以就需要支持amqp的PHP客户端代码。这里用的是php amqplib (http://code.google.com/p/php-amqplib/)。
首先是连接rabbitmq,获取一个通道,然后是发送消息,最后断开通道和连接。下面是代码示例:

1
2
3
4
5
6
7
8
9
10
11
$queue = 'mail_queue'
$conn = new AMQPConnection( $config [ 'host' ], $config [ 'port' ], $config [ 'user' ], $config [ 'pass' ]);
$channel = $conn ->channel();
$channel ->queue_declare( $queue , false, true, false, false);
$send_data = serialize( $send_msg ); //数据先序列化一下,也可以使用JSON格式化
$msg = new AMQPMessage( $send_data ,
         array ( 'delivery_mode' => 2) //让消息持久化
     );
$channel ->basic_publish( $msg , '' , $queue );
$channel ->close();
$conn ->close();

在项目里当然不能这样写,应该封装成一个分布式服务接口,融入到整个系统代码架构里面,方便其他地方,比如controller,model的使用。

接下来是实现消息的消费程序。这里用的是python的pika。
首先是连接rabbitmq,获取通道,开始消费队列里面的信息。以下的代码写在类里面:

1
2
3
4
5
6
7
self .connection = pika.BlockingConnection(pika.ConnectionParameters(host = self .rmq_host))
self .channel = self .connection.channel()
self .channel.queue_declare(queue = self .rmq_queue, durable = True )
self .channel.basic_qos(prefetch_count = 1 )
# callback里面就是具体处理消息的地方
self .channel.basic_consume( self .callback, queue = self .rmq_queue)
self .channel.start_consuming()

callback回调

1
2
3
4
5
6
7
8
9
10
11
def callback( self , ch, method, properties, body):
         time.sleep( 1 ) #休息一秒才发送邮件
     msg = phpserialize.loads(body) #按PHP的格式做反序列化
     validateutil = ValidateUtil()       
     if validateutil.isEmail(msg[ 'mail_to' ]):
         mail = Mailer()
         mail.setMailTo(msg[ 'mail_to' ])
         mail.setMailSubject(msg[ 'mail_subject' ])
         mail.setMailHtmlBody(msg[ 'mail_body' ])
         mail.sendEmail()
     ch.basic_ack(delivery_tag = method.delivery_tag)

这里只是骨干代码。应该建立一个python的project,在eclipse(加PyDev)里面管理起来。
你还需要用到:配置文件以及配置文件解析库,系统日志,Mailer,phpserialize,ValidateUtil等等辅助类库。
关于PyDev请参考:http://www.ibm.com/developerworks/cn/opensource/os-cn-ecl-pydev/
熟悉了邮件的应用,后面扩展到手机短信通知服务、站内通信消息等等就非常方便了。当然,面对这样的需求,我们就需要在实现时考虑使用可扩展的消息队列的模型了。

3,页面访问统计:通过写缓存减轻DB的负载

对于商城来说,都有商品推荐的功能,比如人气商品推荐。怎么定义人气呢?一般看商品页面的访问量。这里就出现了页面统计的需求了。统计的数据一般需要持久化到DB。

一般来说,某商品页面被访问一次,就应该插入或者更新一次DB记录。这完全没有什么技术难度。
然而当并发连接上到一定水平,DB的性能问题就出来了。因为DB,比如MYSQL,都有一定的锁机制。当出现频繁的insert或者update时,select的速度自然就受到很大制约了。而且打开一次页面就触发一次统计,也就要操作一次DB,那DB不哭才怪!

有见及此,我们就通过消息队列实现了页面访问统计的写缓存。

何谓写缓存?对于某些不需要高实时的数据,比如我们这里的页面访问统计,可以把更新操作先缓存起来,当累积到一定程度时,才进行一次实际的更新。这样的好处是显而易见的,DB操作少了很多,而且也避免的DB锁机制引发的性能问题。

实现写缓存的方式有很多,比如通过memcached来实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
$page = 'goods_100052' ;
$memcache = memcache_connect( '192.168.1.100' , 11711);
// 通过memcached提供的原子加操作,避免并发访问带来的统计出错.
$count = $memcache ->increment( $page , 1);
if (! $count ) {
     $memcache ->add( $page , 1, false, 0);
     exit ;
}
if ( $count >= 1000) {
     $sql = "update `goods_viewlog` set `count` = `count`+{$count} where `page` = $page" ;
     $result = $mysql ->query( $sql );
     if ( $result ) {
         // 更新成功后,把缓存统计清零
         $memcache ->set( $page , 0, false, 0);
     }
}

我们这里采用了消息队列的实现方式。
消息生产者代码和消息消费者代码和上面介绍的邮件是几乎一样的。唯一不同在于回调函数那里。这里就不再重复说明了。

4,总结

我们在开发消息队列应用特别要注意的是要先搞清楚消息队列的主要概念和机制:比如交换,队列,绑定,持久化等等。
搞清楚了以后,再根据具体的应用类型,定义好消息队列模型。
具体可以参考伍哥前面的文章

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3月前
|
传感器 监控 物联网
golang开源的可嵌入应用程序高性能的MQTT服务
golang开源的可嵌入应用程序高性能的MQTT服务
209 3
|
3月前
|
消息中间件 存储 监控
搭建消息时光机:深入探究RabbitMQ_recent_history_exchange在Spring Boot中的应用【RabbitMQ实战 二】
搭建消息时光机:深入探究RabbitMQ_recent_history_exchange在Spring Boot中的应用【RabbitMQ实战 二】
36 1
|
6月前
|
消息中间件 存储 网络协议
我们一起来学RabbitMQ 二:RabbiMQ 的 6 种模式的基本应用
我们一起来学RabbitMQ 二:RabbiMQ 的 6 种模式的基本应用
|
3月前
|
网络协议 Go 数据安全/隐私保护
golang开源的可嵌入应用程序高性能的MQTT服务
golang开源的可嵌入应用程序高性能的MQTT服务
268 2
|
8月前
|
消息中间件 运维 Java
RocketMQ的常规运维实践应用
RocketMQ的常规运维实践应用
490 1
|
4月前
|
消息中间件 监控 负载均衡
Kafka高级应用:如何配置处理MQ百万级消息队列?
在大数据时代,Apache Kafka作为一款高性能的分布式消息队列系统,广泛应用于处理大规模数据流。本文将深入探讨在Kafka环境中处理百万级消息队列的高级应用技巧。
188 0
|
5月前
|
消息中间件 缓存 NoSQL
[中间件] 秒杀系统秒杀率提高300%?教你如何利用redis和rabbitmq 优化应用!
[中间件] 秒杀系统秒杀率提高300%?教你如何利用redis和rabbitmq 优化应用!
174 0
|
5月前
|
消息中间件 Java
RabbitMQ【应用 01】SpringBoot集成RabbitMQ及设置RabbitMQ启动总开关
RabbitMQ【应用 01】SpringBoot集成RabbitMQ及设置RabbitMQ启动总开关
105 0
|
6月前
|
消息中间件 网络协议 物联网
Golang微服务框架Kratos应用MQTT消息队列
MQTT 协议 是由`IBM`的`Andy Stanford-Clark博士`和`Arcom`(已更名为Eurotech)的`Arlen Nipper博士`于 1999 年发明,用于石油和天然气行业。工程师需要一种协议来实现最小带宽和最小电池损耗,以通过卫星监控石油管道。最初,该协议被称为消息队列遥测传输,得名于首先支持其初始阶段的 IBM 产品 MQ 系列。2010 年,IBM 发布了 MQTT 3.1 作为任何人都可以实施的免费开放协议,然后于 2013 年将其提交给结构化信息标准促进组织 (OASIS) 规范机构进行维护。2019 年,OASIS 发布了升级的 MQTT 版本 5。
51 0
|
6月前
|
消息中间件 存储 中间件
Golang微服务框架Kratos应用RabbitMQ消息队列
RabbitMQ是一套开源(MPL)的消息队列服务软件,是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成。
90 1