RabbitMQ 的监控

简介: 上两篇文章介绍了:Mac 环境下 RabbitMQ 的安装RabbitMQ 的六种工作模式(附 Python 代码)

上两篇文章介绍了:

接下来说说监控的相关内容。

监控还是非常重要的,特别是在生产环境。磁盘满了,队列积压严重,如果我们还不知道,老板肯定会怀疑,莫不是这家伙要跑路?

而且我现在就遇到了这样的情况,主要是队列积压的问题。由于量不是很大,所以磁盘空间倒不是很担心,但有时程序执行会报错,导致队列一直消费不下去,这就很让人尴尬了。

查了一些资料,总结了一下。想要了解 RabbitMQ 的运行状态,主要有三种途径:Management UI,rabbitmqctl 命令和 REST API。


Management UI


1.png


RabbitMQ 给我们提供了丰富的 Web 管理功能,通过页面,我们能看到 RabbitMQ 的整体运行状况,交换机和队列的状态等,还可以进行人员管理和权限配置,相当全面。

但如果想通过页面来监控,那出不出问题只能靠缘分。看到出问题了,是运气好,看不到出问题,那是必然。

这也是我当前的现状,所以为了避免出现大问题,得赶紧改变一下。

备注:通过 http://127.0.0.1:15672 来访问 Web 页面,默认情况下用户名和密码都是 guest,但生产环境下都应该改掉的。


rabbitmqctl 命令


与前端页面对应的就是后端的命令行命令了,同样非常丰富。平时自己测试,或者临时查看一些状态时,也能用得上。但就我个人使用感觉来说,用的并不是很多。

我总结一些还算常用的,列在下面,大家各取所需:


# 启动服务
rabbitmq-server
# 停止服务
rabbitmqctl stop
# vhost 增删查
rabbitmqctl add_vhost
rabbitmqctl delete_vhost
rabbitmqctl list_vhosts
# 查询交换机
rabbitmqctl list_exchanges
# 查询队列
rabbitmqctl list_queues
# 查看消费者信息
rabbitmqctl list_consumers
# user 增删查
rabbitmqctl add_user
rabbitmqctl delete_user
rabbitmqctl list_users
复制代码


REST API


终于来到重点了,对于程序员来说,看到有现成的 API 可以调用,那真是太幸福了。

自动化监控和一些需要批量的操作,通过调用 API 来实现是最好的方式。比如有一些需要初始化的用户和权限,就可以通过脚本来一键完成,而不是通过页面逐个添加,简单又快捷。


下面是一些常用的 API:


# 概括信息
curl -i -u guest:guest http://localhost:15672/api/overview
# vhost 列表
curl -i -u guest:guest http://localhost:15672/api/vhosts
# channel 列表
curl -i -u guest:guest http://localhost:15672/api/channels
# 节点信息
curl -i -u guest:guest http://localhost:15672/api/nodes
# 交换机信息
curl -i -u guest:guest http://localhost:15672/api/exchanges
# 队列信息
curl -i -u guest:guest http://localhost:15672/api/queues
复制代码


就我现在遇到的情况来说,overviewqueues 这两个 API 就可以满足我的需求,大家也可以根据自己项目的实际情况来选择。

API 返回内容是 json,而且字段还是挺多的,刚开始看会感觉一脸懵,具体含义对照官网的解释和实际情况来慢慢琢磨,弄懂也不是很困难。

下面代码包含了 API 请求以及返回结果的解析,可以在测试环境下执行,稍加更改就可以应用到生产环境。


import json
import logging
import optparse
import requests
logging.basicConfig(
    format='%(asctime)s - %(pathname)s[%(lineno)d] - %(levelname)s: %(message)s',
    level=logging.INFO)
logger = logging.getLogger(__name__)
class RabbitMQMoniter(object):
    """
    RabbitMQ Management API
    """
    def __init__(self, host='', port=15672, username='guest', password='guest'):
        self.host = host
        self.port = port
        self.username = username
        self.password = password
    def call_api(self, path):
        logger.info('call rabbit api to get data on ' + path)
        headers = {'content-type': 'application/json'}
        url = '{0}://{1}:{2}/api/{3}'.format('http', self.host, self.port, path)
        res = requests.get(url, headers=headers, auth=(self.username, self.password))
        return res.json()
    def list_queues(self):
        """
        curl -i -u guest:guest http://localhost:15672/api/queues  
        return: list
        """
        queues = []
        for queue in self.call_api('queues'):
            element = {
                'vhost': queue['vhost'],
                'queue': queue['name']
            }
            queues.append(element)
            logger.info('get queue ' + queue['vhost'] + '/' + queue['name'])
        return queues
    def list_nodes(self):
        """
        curl -i -u guest:guest http://localhost:15672/api/nodes
        return: list
        """
        nodes = []
        for node in self.call_api('nodes'):
            name = node['name'].split('@')[1]
            element = {
                'node': name,
                'node_type': node['type']
            }
            nodes.append(element)
            logger.info('get nodes ' + name + '/' + node['type'])
        return nodes
    def check_queue(self):
        """
        check queue
        """
        for queue in self.call_api('queues'):
            self._get_queue_data(queue)
        return True
    def _get_queue_data(self, queue):
        """
        get queue data
        """
        for item in ['memory', 'messages', 'messages_ready', 'messages_unacknowledged', 'consumers']:
            key = 'rabbitmq.queues[{0},queue_{1},{2}]'.format(queue['vhost'], item, queue['name'])
            value = queue.get(item, 0)
            logger.info('queue data: - %s %s' % (key, value))
        for item in ['deliver_get', 'publish']:
            key = 'rabbitmq.queues[{0},queue_message_stats_{1},{2}]'.format(queue['vhost'], item, queue['name'])
            value = queue.get('message_stats', {}).get(item, 0)
            logger.info('queue data: - %s %s' % (key, value))
    def check_aliveness(self):
        """
        check alive
        """
        return self.call_api('aliveness-test/%2f')['status']
    def check_overview(self, item):
        """
        check overview
        """
        if item in ['channels', 'connections', 'consumers', 'exchanges', 'queues']:
            return self.call_api('overview').get('object_totals').get(item, 0)
        elif item in ['messages', 'messages_ready', 'messages_unacknowledged']:
            return self.call_api('overview').get('queue_totals').get(item, 0)
        elif item == 'message_stats_deliver_get':
            return self.call_api('overview').get('message_stats', {}).get('deliver_get', 0)
        elif item == 'message_stats_publish':
            return self.call_api('overview').get('message_stats', {}).get('publish', 0)
        elif item == 'message_stats_ack':
            return self.call_api('overview').get('message_stats', {}).get('ack', 0)
        elif item == 'message_stats_redeliver':
            return self.call_api('overview').get('message_stats', {}).get('redeliver', 0)
        elif item == 'rabbitmq_version':
            return self.call_api('overview').get('rabbitmq_version', 'None')
    def check_server(self, item, node_name):
        """
        check server
        """
        node_name = node_name.split('.')[0]
        for nodeData in self.call_api('nodes'):
            if node_name in nodeData['name']:
                return nodeData.get(item, 0)
        return 'Not Found'
def main():
    """
    Command-line
    """
    choices = ['list_queues', 'list_nodes', 'queues', 'check_aliveness', 'overview', 'server']
    parser = optparse.OptionParser()
    parser.add_option('--username', help='RabbitMQ API username', default='guest')
    parser.add_option('--password', help='RabbitMQ API password', default='guest')
    parser.add_option('--host', help='RabbitMQ API host', default='127.0.0.1')
    parser.add_option('--port', help='RabbitMQ API port', type='int', default=15672)
    parser.add_option('--check', type='choice', choices=choices, help='Type of check')
    parser.add_option('--metric', help='Which metric to evaluate', default='')
    parser.add_option('--node', help='Which node to check (valid for --check=server)')
    (options, args) = parser.parse_args()
    if not options.check:
        parser.error('At least one check should be specified')
    logger.info('start running ...')
    api = RabbitMQMoniter(username=options.username, password=options.password, host=options.host, port=options.port)
    if options.check == 'list_queues':
        logger.info(json.dumps({'data': api.list_queues()}, indent=4, separators=(',', ':')))
    elif options.check == 'list_nodes':
        logger.info(json.dumps({'data': api.list_nodes()}, indent=4, separators=(',', ':')))
    elif options.check == 'queues':
        logger.info(api.check_queue())
    elif options.check == 'check_aliveness':
        logger.info(api.check_aliveness())
    elif options.check == 'overview':
        if not options.metric:
            parser.error('Missing required parameter: "metric"')
        else:
            if options.node:
                logger.info(api.check_overview(options.metric))
            else:
                logger.info(api.check_overview(options.metric))
    elif options.check == 'server':
        if not options.metric:
            parser.error('Missing required parameter: "metric"')
        else:
            if options.node:
                logger.info(api.check_server(options.metric, options.node))
            else:
                logger.info(api.check_server(options.metric, api.host))
if __name__ == '__main__':
    main()
复制代码

调用及返回:

python3 rabbitmq_status.py --check list_queues
# 2020-04-12 14:33:15,298 - rabbitmq_status.py[142] - INFO: start running ...
# 2020-04-12 14:33:15,298 - rabbitmq_status.py[26] - INFO: call rabbit api to get data on queues
# 2020-04-12 14:33:15,312 - rabbitmq_status.py[46] - INFO: get queue //task_queue
# 2020-04-12 14:33:15,312 - rabbitmq_status.py[147] - INFO: {
#     "data":[
#         {
#             "vhost":"/",
#             "queue":"task_queue"
#         }
#     ]
# }
复制代码


通过对返回结果进行解析,就可以判断 RabbitMQ 的整体运行状态,如果发生超阈值的情况,可以发送告警或邮件,来达到监控的效果。

针对队列积压情况的监控判断,有两种方式:一是设置队列积压长度阈值,如果超过阈值即告警;二是保存最近五次的积压长度,如果积压逐渐增长并超阈值,即告警。

第二种方式更好,判断更加精准,误告可能性小,但实现起来也更复杂。

这里只是提一个思路,等后续再把实践结果和代码分享出来。或者大家有哪些更好的方法吗?欢迎留言交流。


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
7月前
|
消息中间件 Web App开发 监控
mqtt数据问题之如何实现webRTC 协议的监控视频压测
MQTT协议是一个轻量级的消息传输协议,设计用于物联网(IoT)环境中设备间的通信;本合集将详细阐述MQTT协议的基本原理、特性以及各种实际应用场景,供用户学习和参考。
126 0
|
消息中间件 存储 缓存
RocketMQ 监控告警:生产环境如何快速通过监控预警发现堆积、收发失败等问题?
本文主要向大家介绍如何利用 RocketMQ 可观测体系中的指标监控,对生产环境中典型场景:消息堆积、消息收发失败等场景配置合理的监控预警,快速发现问题,定位问题。
1438 0
RocketMQ 监控告警:生产环境如何快速通过监控预警发现堆积、收发失败等问题?
|
消息中间件 存储 Prometheus
可视化界面工具可以用于管理和监控 Apache RocketMQ
可视化界面工具可以用于管理和监控 Apache RocketMQ
1983 3
|
消息中间件 监控 Shell
配置了RocketMQ的监控IP地址
配置了RocketMQ的监控IP地址
327 1
EMQ
|
数据采集 存储 Prometheus
EMQX+Prometheus+Grafana:MQTT 数据可视化监控实践
本文介绍了如何将EMQX 5.0的监控数据集成到Prometheus中,使用Grafana来展示EMQX的监控数据,并最终搭建出一个简单的MQTT数据可视化监控系统。
EMQ
646 0
EMQX+Prometheus+Grafana:MQTT 数据可视化监控实践
|
消息中间件 运维 监控
Rocketmq-console集群监控平台搭建|学习笔记
快速学习Rocketmq-console集群监控平台搭建
1024 0
Rocketmq-console集群监控平台搭建|学习笔记
|
消息中间件 监控 数据可视化
SpringCloud进阶:一文通透RabbitMQ服务监控
前面我们介绍了通过turbine直接聚合多个服务的监控信息,实现了服务的监控,但是这种方式有个不太好的地方就是turbine和服务的耦合性太强了,针对这个问题,我们可以将服务的监控消息发送到RabbitMQ中,然后turbine中RabbitMQ中获取获取监控消息,这样就实现类服务和turbine的解耦。 我们通过案例来演示下如何实现该效果 一、启动RabbitMQ服务 显然我们需要安装启动一个RabbitMQ服务 二、创建consumer服务 创建一个consumer服务,同时要将dashboard的监控信息发送到RabbitMQ服务中。 1.创建项目
SpringCloud进阶:一文通透RabbitMQ服务监控
|
消息中间件 监控 Java
第二章:RocketMQ集群监控平台 rocketmq-console 搭建
第二章:RocketMQ集群监控平台 rocketmq-console 搭建
337 0
第二章:RocketMQ集群监控平台 rocketmq-console 搭建
|
消息中间件 Prometheus 监控
prometheus+grafana监控rabbitmq
prometheus+grafana监控rabbitmq
809 0
prometheus+grafana监控rabbitmq
|
监控 Java 数据库
spring boot +RabbitMQ +InfluxDB+Grafara监控实践
本文需要有相关spring boot 或spring cloud 相关微服务框架的基础,如果您具备相关基础可以很容易的实现下述过程!!!!!!!   希望本文的所说对需要的您有所帮助   从这里我们开始进入闲聊阶段。
2852 0