离线数据推送问题(消息队列)

简介: 今天发生的问题:消息队列报错,实时消息没有发送成功,重启后问题消失。

每天晚上9点多我要起身下班的时候,抬眼看到周围还在公司的也就只有几个刚毕业的小鲜肉了,就感觉自己也好年轻啊。虽然不止感觉,其实本来也不老。但是转眼又一想,我晚饭都不吃,每天多比人家工作了好几个小时,但是水平的提高一点也不成正比啊,是不是方法不当啊。唯一成正比的,我确实把身体锻炼好了,骑自行车40公里来上班,中间不停,骑得飞快,整个路程总体还是往高处走的,到公司休息一下正常上班,一点儿感觉都没有。有一种无知叫做不知道什么事情是不可能的,想到就去做了。


  直到中午和我们同事90后MM去吃饭,她最近想在网上做些小生意。我才想到:女孩子最大的劣势在于专注。朋友圈里好多有正式工作的女孩在弄微商,我却没见过一个技术特别好的男同事在弄副业的。我可以同时做很多的事情,确实所有的事情都没有做深入到极致。人所能达到的境界不是和付出的时间成正比,而是和对自己的要求成正比。人的优秀程度是和野心成正比的。我的知足常乐的心,在生活中是件好事,但是工作中却是自己最大的挑战了。和之间同事聊天,他说每天进步一点点就好。我说如果我现在每天只是进步一点点,那么以后的进步就更慢了。这几年中没有一个质的飞越,过几年就更难冲破前进的壁垒,最后是自己以为自己在进步,别人看你只是原地打转。


  今天发生的问题:消息队列报错,实时消息没有发送成功,重启后问题消失。


1112728-20170407134442238-1191623360.png


继续看其他的错误日志:


1112728-20170407134829363-10216688.png


1112728-20170407135104800-1258919974.png


消息队列采用公司统一的apache qpidd集群。报错的lesocms.video.guoguang.queue这个消费队列。问题很清楚,生产者在我这边,消费者在搜索部门。生产的东西消费者没被消费掉,队列积压了。消费的问题不管是他们消费程序挂了还是消费慢,都已经交给搜索部门去处理了,我这边要解决遇到这种问题怎么处理。


  问题1:队列满了之后尝试了几次后close,只能靠人工重启重置连接的问题


  解决方法: 找到几个关键的异常点


Caused by: org.apache.qpid.transport.SessionException: timed out waiting for sync: complete = -1, point = 0
Exception when sending message:timed out waiting for sync: complete = -1, point = 0
Caused by: org.apache.qpid.transport.SessionClosedException: session closed
Caused by: javax.jms.JMSException: Exception when sending message:session closed


异常刚开始的日志:


1112728-20170407164603472-207317010.png


定位到刚开始异常的行:


1112728-20170407171157472-2063982789.png


找到异常前后的日志:


1112728-20170407173555144-1116146110.png


发现越往后的日志里,新增大量的 create


class:com.letv.mms.transmission.task.sub.SwiftSendMsgTask


1112728-20170407174206394-534831410.png


我专门查了一下 create class:com.letv.mms.transmission.task.sub.SwiftSendMsgTask的新建数量:


1112728-20170407174108832-1175440949.png


出问题的这天不断的新建,正常稳定的时候是没有新加的。SwiftSendMsgTask是我当初自己设计的一个对象连接池,目的在于如果消息的发送和正常向消息队列里组装消息是同步的,会造成第一实行性不能保证,因为有的专辑下面有几万个视频,必须组装成一个消息发送,这个消息组装就要好几分钟。第二,组装过程中数据库连接池等待时间过长会自动关闭。所以我就直接异步发消息,从对象连接池中取出一个处理发消息的处理对象扔进去,直接处理下一个。如果处理消息的空闲对象不够用我就直接新建一个放到连接池里。一直想好好总结一下离线数据的程序,因为这个程序整个架构基本上很原始,资源的调度分配都是程序自己控制的,基本没用什么现成的技术。细节处处处体现精巧,每个设计都解决了特定的问题,但是总体去说这个程序,我却很难把这个程序的独到之处用语言表达出来。言归正传:


记得有次开会,组里谁说线上出了什么什么问题,不过倒是没有异常。我不负责那个项目具体不知道,我只是笑着说:“那可能不是真的没有异常,而是异常日志没打好哦。”经常发现自己这句话说的很有道理。上面异常日志截图里面都把.cpp文件的异常都打印出来了,完全可以按图索骥。但是自己就是个写代码的自己知道,异常的说明文字未必准确,最好还是要查源码。查BasicMessageProducer的源码发现,首先这个session是AMQSession。那么它close了,为什么使用到的时候没新建?org.springframework.jms.connection.CachingConnectionFactory的源码里看到reconnectOnException默认是true,也就是说抛出了这个Jms异常理论上是会新建的,除非新建不成功,不成功是因为SwiftSendMsgTask的新建数量太多,超过了设定的<property name="sessionCacheSize" value="700"></property>。那么我要解决的就是SwiftSendMsgTask在异常后不要新建那么多的问题了。


  将原有的一个对象池分成两个,一个是无限制的对象池,使用时即创建。因为这个离线服务半夜有个跑全量的,我会起1000多个线程来跑,但是每次处理数据的线程池是50,因为这个环节要涉及大量CPU计算数据库连接,虽然是高配物理机,而且数据库是专门将线上数据实时复制的一个从库,专门使用(线程数不大于100的时候效率高)。但是是24核CPU,计算量大,线程数大于50会有CPU跑满的风险。但是每个线程会生成独立的数据文件,然后进行gz压缩。gz压缩很耗时,但是消耗的IO资源,释放了CPU,平时的时候跑全量时会存在600多个同时在压缩,所以对这个的对象池无限制。他们问我:为啥你的程序执行的那么快,我的数量小,反而慢了那么多?因为你拷贝完我那一版之后我改了代码[哭笑], 我把很多线程中不需要返回结果的,和大循环中的项都扔到另外的线程池里去啦。


  发消息的单独放到一个有限制的线程池里去管理。本来cacheSize是700,但是发现正常情况下就算数据量突增,100个都不解决问题的话(其实正常情况下会5个负责发消息的,因为消息体最大是4M,发消息是很快的,异步的,扔到exchange中即可,实时也没有什么并发量),媒资程序那边就挂了,异常不会到达这边,折中一下资源,将cacheSize设置为100。程序中创建对象的时候,如果对象池的activeNum个数超过或者等于91个(因为最多会有8个sleeping的),则不会再新建。配了日志报警,到达40个系统会给我发报警邮件。


  问题2:为什么数据量会突增


  答案:咨询了一下德伟:最近接了一批短视频。实时的量发生了剧增。所以消费的能力突然不够也是正常的。目前消费者有两个:一个专辑的,一个视频的。但是生产者只有一个,如果专辑或者视频一个发生了突增,会影响到另一个。另外,专辑有的消息体特别大,极端情况下,一个队列也就是能放100多个消息。所以决定将专辑和视频分开,已经和搜索部门的同事达成协议。并提醒他们将队列承载量采用最高配


                                                    (500M)


1112728-20170407183040832-1750796993.png

。因为发现他们那边现在不是这么做的[汗]。


  问题3:没有收到消息队列溢出的报警


  答案:咨询了管MQ集群的同事,报警没加上[汗]。

  

  相信问题解决到这个程度,下次再遇到这种问题,搜索的哥哥们下次就不会第一时间来找我了。下次沟通估计就是我出差回来给他们带吃哒[胜利][胜利]

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
消息中间件 SQL 存储
阿里云物联网平台设备数据转发到消息队列RocketMQ全链路测试
本文从物联网平台的产品及设备的创建开始,逐步介绍整个链路的完整实现。
阿里云物联网平台设备数据转发到消息队列RocketMQ全链路测试
|
消息中间件 监控 大数据
阿里云正式推出消息队列Kafka:兼容开源,数据可靠性99.999999%
7月25日,阿里云宣布正式推出消息队列Kafka,全面融合开源生态。在兼容Apache生态的基础上,阿里云消息队列Kafka彻底解决了开源产品稳定性不足的痛点,可用性达99.9%,数据可靠性99.999999%,并且支持消息无缝迁移到云上。
2104 0
|
消息中间件 大数据 Kafka
阿里云正式推出消息队列Kafka:兼容开源,数据可靠性99.999999%
7月25日,阿里云宣布正式推出消息队列Kafka,全面融合开源生态。在兼容Apache生态的基础上,阿里云消息队列Kafka彻底解决了开源产品稳定性不足的痛点,可用性达99.9%,数据可靠性99.999999%,并且支持消息无缝迁移到云上。
5682 0
|
消息中间件 测试技术 安全
生产环境消息队列ActiveMQ的数据积压优化过程
针对消息队列的数据积压问题,我们主要做了三个方面的优化处理,取消同步锁、ActiveMQ参数优化、本地双队列优化,通过这三个方面的优化基本解决了队列数据积压的问题。
4792 0
|
6天前
|
消息中间件 存储 监控
RabbitMQ:分布式系统中的高效消息队列
RabbitMQ:分布式系统中的高效消息队列
|
7月前
|
消息中间件 存储 网络协议
企业实战(11)消息队列之Docker安装部署RabbitMQ实战
企业实战(11)消息队列之Docker安装部署RabbitMQ实战
141 0
|
6天前
|
消息中间件 分布式计算 监控
Python面试:消息队列(RabbitMQ、Kafka)基础知识与应用
【4月更文挑战第18天】本文探讨了Python面试中RabbitMQ与Kafka的常见问题和易错点,包括两者的基础概念、特性对比、Python客户端使用、消息队列应用场景及消息可靠性保证。重点讲解了消息丢失与重复的避免策略,并提供了实战代码示例,帮助读者提升在分布式系统中使用消息队列的能力。
42 2
|
6天前
|
消息中间件 Java
springboot整合消息队列——RabbitMQ
springboot整合消息队列——RabbitMQ
82 0
|
6天前
|
消息中间件 JSON Java
RabbitMQ消息队列
RabbitMQ消息队列
46 0

热门文章

最新文章