Prometheus-钉钉告警

本文涉及的产品
可观测监控 Prometheus 版,每月50GB免费额度
简介: Prometheus-钉钉告警

钉钉告警

global:
  resolve_timeout: 10m

route:
  group_by: ['alertname']
  group_wait: 10s
  group_interval: 10s
  repeat_interval: 5m
  receiver: 'webhook1'
receivers:
- name: 'webhook1'  # 这里对应 prometheus-webhook-dingtalk 这个配置文件里
  webhook_configs:
  - url: 'http://x.x.x.x:8060/dingtalk/webhook1/send'
  - url: 'http://x.x.x.x:8066/wechat/webhook1/send'
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
实现调用钉钉机器人接口发送告警信息到IT群
'''

import time
import requests
import json
import sys
import logging
import re

import hmac
import hashlib
import base64
import urllib.parse


from flask import Flask, request, jsonify


app = Flask(__name__)


class CheckJSON():
    def getkeys(self, data):
        keys_all_list = []

        def getkeys(data):
            if (type(data) == type({})):
                keys = data.keys()
                for key in keys:
                    value = data.get(key)
                    if (type(value) != type({}) and type(value) != type([])):
                        keys_all_list.append(key)
                    elif (type(value) == type({})):
                        keys_all_list.append(key)
                        getkeys(value)
                    elif (type(value) == type([])):
                        keys_all_list.append(key)
                        for para in value:
                            if (type(para) == type({})
                                    or type(para) == type([])):
                                getkeys(para)
                            else:
                                keys_all_list.append(para)

        getkeys(data)
        return keys_all_list

    def is_exists(self, data, tagkey):
        if (type(data) != type({})):
            print('Please input a json!')
        else:
            key_list = self.getkeys(data)
            for key in key_list:
                if (key == tagkey):
                    return True

        return False


def get_timestamp_sign(secret):
    timestamp = str(round(time.time() * 1000))
    #secret = "SEC767ff0761242961b5be36e26b7525f0ff7261be4b90b9639ce795b4cdd11c423"
    secret_enc = secret.encode('utf-8')
    string_to_sign = '{}\n{}'.format(timestamp, secret)
    string_to_sign_enc = string_to_sign.encode('utf-8')
    hmac_code = hmac.new(secret_enc, string_to_sign_enc,
                digestmod=hashlib.sha256).digest()
    sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
    #print("timestamp: ", timestamp)
    #print("sign:", sign)
    return (timestamp, sign)

def get_signed_url(webhook, secret):
    timestamp, sign = get_timestamp_sign(secret)
    webhook = webhook + "&timestamp=" + timestamp + "&sign=" + sign
    return webhook


def get_webhook(webhook, secret, mode):
    if mode == 0: # only 敏感字
       webhook = webhook
    elif mode == 1 or  mode ==2 : # 敏感字和加签 或 # 敏感字+加签+ip
        webhook = get_signed_url(webhook, secret)
    else:
        webhook = ""
        print("error! mode:   ", mode ,"  webhook :  ", webhook)
    return webhook

def verify_datetime(datetime_):
    pattern = r'((?!0000)[0-9]{4}-((0[1-9]|1[0-2])-(0[1-9]|1[0-9]|2[0-8])|(0[13-9]|1[0-2])-(29|30)|(0[13578]|1[02])-31)|([0-9]{2}(0[48]|[2468][048]|[13579][26])|(0[48]|[2468][048]|[13579][26])00)-02-29) (20|21|22|23|[0-1]\d):[0-5]\d:[0-5]\d$'
    if re.match(pattern, datetime_):
        return True

    return False


def format_date_tz(raw_date):
    date = raw_date.replace('T', ' ').replace('Z', '').split('.')[0]
    if not verify_datetime(date):
        raise ValueError(raw_date, date)
    return date


def trim(docstring):
    if not docstring:
        return ''
    # Convert tabs to spaces (following the normal Python rules)
    # and split into a list of lines:
    lines = docstring.expandtabs().splitlines()
    # Determine minimum indentation (first line doesn't count):

    indent = sys.maxsize

    for line in lines[1:]:
        stripped = line.lstrip()
        if stripped:
            indent = min(indent, len(line) - len(stripped))
    # Remove indentation (first line is special):
    trimmed = [lines[0].strip()]
    if indent < sys.maxsize:
        for line in lines[1:]:
            trimmed.append(line[indent:].rstrip())
    # Strip off trailing and leading blank lines:
    while trimmed and not trimmed[-1]:
        trimmed.pop()
    while trimmed and not trimmed[0]:
        trimmed.pop(0)
    # Return a single string:
    return '\n'.join(trimmed)


def get_message():
    check_json = CheckJSON()
    try:
        if not check_json.is_exists(json.loads(request.data), 'topic'):
            data = json.loads(request.data)
            # print(json.dumps(data, encoding="UTF-8", ensure_ascii=False))

            alerts = data['alerts']
            # element_lens = len(alerts)
            # print(element_lens)

            msg_list = []
            str_msg = ''
            num = 0
            while num < len(alerts):
                alert_status = alerts[num].get('status')

                if alert_status == 'firing':
                    env = alerts[num].get('labels')['env']  # 告警环境
                    alert_object = alerts[num].get('labels')['app']  # 告警对象
                    alert_name = alerts[num].get('labels')['alertname']  # 告警主题
                    alert_desc = alerts[num].get('annotations')['description']  # 告警详情
                    alert_starts_at = format_date_tz(alerts[num].get('startsAt'))  # 触发时间
                    alert_status = alerts[num].get('status')  # 告警状态

                    str_msg = '''> **=============start============**\n\n> **告警环境:** {0}\n\n> **告警对象:** {1}\n\n> **告警主题:** {2}\n\n> **告警详情:** <font color='#FF4500'> {3} </font>\n\n> **触发时间:** {4}\n\n> **告警状态:** <font color='#FF4500'> {5} </font>\n\n> **=============end=============**'''.format(env, alert_object, alert_name, alert_desc, alert_starts_at, alert_status)

                elif alert_status == 'resolved':
                    env = alerts[num].get('labels')['env']  # 告警环境
                    alert_object = alerts[num].get('labels')['app']  # 告警对象
                    alert_name = alerts[num].get('labels')['alertname']  # 告警主题
                    alert_desc = alerts[num].get('annotations')['description']  # 告警详情
                    alert_starts_at = format_date_tz(alerts[num].get('startsAt'))  # 触发时间
                    alert_ends_at = format_date_tz(alerts[num].get('endsAt'))  # 恢复时间
                    alert_status = alerts[num].get('status')  # 告警状态

                    str_msg = '''> **============start=============**\n\n> **告警环境:** {0}\n\n> **告警对象:** {1}\n\n> **告警主题:** {2}\n\n> **告警详情:** <font color='#FF4500'> {3} </font>\n\n> **触发时间:** {4}\n\n> **恢复时间:** {5}\n\n> **告警状态:** <font color='#9ACD32'> {6} </font>\n\n> **===========end===============**'''.format(env, alert_object, alert_name, alert_desc, alert_starts_at, alert_ends_at, alert_status)


                msg_list.append(str_msg)
                # print(msg_list)
                str_msg = ''.join(msg_list)
                # print(str_msg)
                num += 1

            return trim(str_msg)
        else:
            data = json.loads(request.data)
            # print(json.dumps(data, encoding="UTF-8", ensure_ascii=False))
            alerts = data['alerts']

            msg_list = []
            str_msg = ''
            num = 0

            while num < len(alerts):
                alert_status = alerts[num].get('status')

                if alert_status == 'firing':
                    env = alerts[num].get('labels')['env']  # 告警环境
                    alert_object = alerts[num].get('labels')['app']  # 告警业务
                    alert_name = alerts[num].get('labels')['alertname']  # 告警主题
                    alert_group = alerts[num].get('annotations')['group']  # 告警消息组
                    alert_topic = alerts[num].get('annotations')['topic']  # 告警topic
                    alert_desc = alerts[num].get('annotations')['description']  # 告警详情
                    alert_starts_at = format_date_tz(alerts[num].get('startsAt'))  # 触发时间
                    alert_status = alerts[num].get('status')  # 告警状态

                    str_msg = '''
                        ============start=============
                        告警环境: {0}
                        告警业务: {1}
                        告警主题: {2}
                        告警消息组: {3}
                        告警Topic: {4}
                        告警详情: {5}
                        触发时间: {6}
                        告警状态: {7}
                        ===========end================
                    '''.format(env, alert_object, alert_name, alert_group,
                               alert_topic, alert_desc, alert_starts_at,
                               alert_status)
                elif alert_status == 'resolved':
                    env = alerts[num].get('labels')['env']  # 告警环境
                    alert_object = alerts[num].get('labels')['app']  # 告警业务
                    alert_name = alerts[num].get('labels')['alertname']  # 告警主题
                    alert_group = alerts[num].get('annotations')['group']  # 告警消息组
                    alert_topic = alerts[num].get('annotations')['topic']  # 告警topic
                    alert_desc = alerts[num].get('annotations')['description']  # 告警详情
                    alert_starts_at = format_date_tz(alerts[num].get('startsAt'))  # 触发时间
                    alert_ends_at = format_date_tz(alerts[num].get('endsAt'))  # 恢复时间
                    alert_status = alerts[num].get('status')  # 告警状态

                    str_msg = '''
                        ============start=============
                        告警环境: {0}
                        告警业务: {1}
                        告警主题: {2}
                        告警消息组: {3}
                        告警Topic: {4}
                        告警详情: {5}
                        触发时间: {6}
                        恢复时间: {7}
                        告警状态: {8}
                        ===========end================
                    '''.format(env, alert_object, alert_name, alert_group,
                               alert_topic, alert_desc, alert_starts_at,
                               alert_ends_at, alert_status)

                msg_list.append(str_msg)
                # print(msg_list)
                str_msg = ''.join(msg_list)
                # print(str_msg)
                num += 1

            return trim(str_msg)
    except Exception as e:
        logging.error(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + e)
        print(e)



@app.route('/dingtalk/webhook1/send', methods=['POST'])
def send_ding_message():
    robot_webhook = "https://oapi.dingtalk.com/robot/send?access_token=4704705dea51cb9d5bc78167sdsfafas1c9eaf1becf559d7d0311754adcf2fde31e2d46e"
    robot_secret = "SEC9c87827412e3d9c265e3b92b7f4067ad39d73dda7416825ab681f6bfsdf3f30ee1"
    webhook = get_webhook(robot_webhook, robot_secret, 1) # 主要模式有 0 : 敏感字 1:# 敏感字 +加签 3:敏感字+加签+IP

    # 请求头部
    header = {"Content-Type": "application/json", "Charset": "UTF-8"}

    msg = get_message()
    print(msg)
    #payload = {
    #    "msgtype": 'text',
    #    "text": {'content': msg},
    #}
    payload = {
        "msgtype": "markdown",
        "markdown": {"title": "芯动科技服务器告警", "text":msg},
    }

    # 对请求的数据进行json封装
    message_json = json.dumps(payload)
    # print(message_json)
    requests.DEFAULT_RETRIES = 3
    # 发送请求
    info = requests.post(url=webhook, data=message_json, headers=header)
    # 打印返回结果
    print(info.text)
    if (json.loads(info.text)['errcode'] == 0):
        return jsonify({
            'retval': 0,
            'msg': 'ok',
            'description': '告警消息发送成功'
        })

    logging.error(
        time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) +
        ' 钉钉接口调用失败 ' + json.loads(info.text)['sub_msg'])
    return jsonify({
        'retval': json.loads(info.text)['sub_code'],
        'msg': json.loads(info.text)['sub_msg'],
        'description': '告警消息发送失败'
    })



if __name__ == "__main__":
    # 运维群机器人消息通知

    LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s"
    DATE_FORMAT = "%m/%d/%Y %H:%M:%S %p"
    logging.basicConfig(filename='/opt/py/dingtalk-robot.log', level=logging.DEBUG, format=LOG_FORMAT,
                        datefmt=DATE_FORMAT)
    print('---------------- 启动app前')


    app.run(debug=True, host='0.0.0.0', port=8060)
相关实践学习
容器服务Serverless版ACK Serverless 快速入门:在线魔方应用部署和监控
通过本实验,您将了解到容器服务Serverless版ACK Serverless 的基本产品能力,即可以实现快速部署一个在线魔方应用,并借助阿里云容器服务成熟的产品生态,实现在线应用的企业级监控,提升应用稳定性。
相关文章
|
6月前
|
缓存
ecs-centos分区空间大于70时发送钉钉告警并清理
当分区空间大于70时,开始清理并发送钉钉告警。
96 1
|
6月前
|
机器人 关系型数据库 MySQL
shell脚本实现文件自动清理并推送钉钉机器人告警
shell脚本实现文件自动清理并推送钉钉机器人告警
115 1
|
6月前
|
Prometheus Cloud Native 机器人
Prometheus告警简介
Prometheus告警简介
|
6月前
|
运维 监控 安全
调用钉钉机器人API接口将堡垒机安全运维告警单发给运维人员
调用钉钉机器人API接口将堡垒机安全运维告警单发给运维人员
195 0
|
24天前
|
Prometheus 运维 监控
智能运维实战:Prometheus与Grafana的监控与告警体系
【10月更文挑战第26天】Prometheus与Grafana是智能运维中的强大组合,前者是开源的系统监控和警报工具,后者是数据可视化平台。Prometheus具备时间序列数据库、多维数据模型、PromQL查询语言等特性,而Grafana支持多数据源、丰富的可视化选项和告警功能。两者结合可实现实时监控、灵活告警和高度定制化的仪表板,广泛应用于服务器、应用和数据库的监控。
134 3
|
17天前
|
数据采集 Prometheus 监控
Prometheus的告警规则
Prometheus的告警规则
42 11
|
18天前
|
Prometheus Cloud Native
Prometheus的告警处理
【10月更文挑战第31天】Prometheus的告警处理
23 3
|
18天前
|
Prometheus Kubernetes Cloud Native
Prometheus的告警配置
【10月更文挑战第31天】Prometheus的告警配置
26 1
|
23天前
|
Prometheus 运维 监控
智能运维实战:Prometheus与Grafana的监控与告警体系
【10月更文挑战第27天】在智能运维中,Prometheus和Grafana的组合已成为监控和告警体系的事实标准。Prometheus负责数据收集和存储,支持灵活的查询语言PromQL;Grafana提供数据的可视化展示和告警功能。本文介绍如何配置Prometheus监控目标、Grafana数据源及告警规则,帮助运维团队实时监控系统状态,确保稳定性和可靠性。
121 0
|
3月前
|
JSON 机器人 Go
go接收alertmanager告警并发送钉钉
go接收alertmanager告警并发送钉钉
下一篇
无影云桌面