大数据运维之数据质量管理

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用版 2核4GB 50GB
简介: 大数据运维之数据质量管理

🎉这篇文章主要介绍了如何使用 Azkaban 调度数据质量监控流程,包括创建调度脚本、配置工作流等内容。文章还提供了一个完整的示例,包括创建 Python 脚本、配置Azkaban 工作流等步骤。

第1章 数据质量管理概述

1.1 数据质量管理定义

数据质量管理(Data Quality Management),是指对数据从计划、获取、存储、共享、维护、应用、消亡生命周期的每个阶段里可能引发的各类数据质量问题,进行识别、度量、监控、预警等一系列管理活动,并通过改善和提高组织的管理水平使得数据质量获得进一步提高。

数据质量管理是循环管理过程,其终极目标是通过可靠的数据提升数据在使用中的价值,并最终为企业赢得经济效益。

1.2 数据质量评价指标

数据质量管理的最终目标是改善,任何改善都是建立在评价的基础上。通常数据质量的评价标准包括以下内容。

评价标准 描述 监控项
唯一性 指主键保持唯一 字段唯一性检查
完整性 主要包括记录缺失和字段值缺失等方面 字段枚举值检查
字段记录数检查
字段空值检查
精确度 数据生成的正确性,数据在整个链路流转的正确性 波动阀值检查
合法性 主要包括格式、类型、域值的合法性 字段日期格式检查
字段长度检查
字段值域检查
时效性 主要包括数据处理的时效性 批处理是否按时完成

第2章 数据质量管理实操

2.1 需求分析

我们的数仓项目主要监控以下数据的指标:

ODS层数据量,每日环比和每周同比变化不能超过一定范围

DIM层不能出现id空值,重复值;

DWD层不能出现id空值,重复值;

在每层中任意挑选一张表作为示例。

检查项目 依据 异常值下限 异常值上限
ods_order_info 同比增长 数据总量 -10% 10%
环比增长 数据总量 -10% 50%
值域检查 final_amount 0 100
dwd_order_info 空值检查 id 0 10
重复值检查 id 0 5
dim_user_info 空值检查 id 0 10
重复值检查 id 0 5

2.2 功能模块

2.3 开发环境准备

2.3.1 Python开发环境准备

本文使用Python和Shell脚本实现数据质量监控的各项功能,故需先搭建相应的开发环境,Python开发可选择IDEA(需安装Python插件),或PyCharm等工具,本文使用IDEA作为开发工具。

1. 安装Python插件

(1)在IDEA中点击“File”,在下拉选择中点击“Settings…”

(2)点击“Plugins”,点击右上角的“Marketplace”,然后在搜索框中输入“python”,在搜索结果列表中找到Python插件,点击“Install”,安装插件。

2. 新建一个Python项目

(1)点击Idea中的“File”,在下列列表中点击“New”,在右侧弹出的列表中点击“Project…”

(2)在新建的工程中,点击“Python”,然后点击Next

(3)首次创建Python项目,会提示无Python SDK,此处选择Yes,后续再添加SDK。

(4)填写项目名称和项目路径等基本信息,点击Finish

(5)添加Python SDK

为了保证测试和运行的Python环境一致,我们配置项目采用远程集群的Python环境执行本地代码,以下为具体配置步骤。

第一步:点击“File”→“Project Structure”

第二步:按照下图操作,增加Python SDK。

第三步:点击“SSH Interpreter”,选择“Existing server configuration”,点击“…”

第四步:点击“+”,填入ssh连接信息,点击Next

第五步:点击Finish

第六步:点击OK

2.3.2 初始化MySQL环境

MySQL主要用于存储数据质量监控的结果值,这里需要提前建库建表。详细建表语句如下:

(1)创建data_supervisor库

drop database if exists data_supervisor;

create database data_supervisor;

(2)创建空值指标表,null_id

CREATE TABLE data_supervisor.null_id

(

dt date NOT NULL COMMENT ‘日期’,

tbl varchar(50) NOT NULL COMMENT ‘表名’,

col varchar(50) NOT NULL COMMENT ‘列名’,

value int DEFAULT NULL COMMENT ‘空ID个数’,

value_min int DEFAULT NULL COMMENT ‘下限’,

value_max int DEFAULT NULL COMMENT ‘上限’,

notification_level int DEFAULT NULL COMMENT ‘警告级别’,

PRIMARY KEY (dt, tbl, col)

) ENGINE = InnoDB

DEFAULT CHARSET = utf8

comment ‘空值指标表’;

(3)创建重复值指标表,duplicate

CREATE TABLE data_supervisor.duplicate

(

dt date NOT NULL COMMENT ‘日期’,

tbl varchar(50) NOT NULL COMMENT ‘表名’,

col varchar(50) NOT NULL COMMENT ‘列名’,

value int DEFAULT NULL COMMENT ‘重复值个数’,

value_min int DEFAULT NULL COMMENT ‘下限’,

value_max int DEFAULT NULL COMMENT ‘上限’,

notification_level int DEFAULT NULL COMMENT ‘警告级别’,

PRIMARY KEY (dt, tbl, col)

) ENGINE = InnoDB

DEFAULT CHARSET = utf8

comment ‘重复值指标表’;

(4)创建值域指标表,rng

CREATE TABLE data_supervisor.rng

(

dt date NOT NULL COMMENT ‘日期’,

tbl varchar(50) NOT NULL COMMENT ‘表名’,

col varchar(50) NOT NULL COMMENT ‘列名’,

value int DEFAULT NULL COMMENT ‘超出预定值域个数’,

range_min int DEFAULT NULL COMMENT ‘值域下限’,

range_max int DEFAULT NULL COMMENT ‘值域上限’,

value_min int DEFAULT NULL COMMENT ‘下限’,

value_max int DEFAULT NULL COMMENT ‘上限’,

notification_level int DEFAULT NULL COMMENT ‘警告级别’,

PRIMARY KEY (dt, tbl, col)

) ENGINE = InnoDB

DEFAULT CHARSET = utf8

comment ‘值域指标表’;

(5)创建环比增长指标表,day_on_day

CREATE TABLE data_supervisor.day_on_day

(

dt date NOT NULL COMMENT ‘日期’,

tbl varchar(50) NOT NULL COMMENT ‘表名’,

value double DEFAULT NULL COMMENT ‘环比增长百分比’,

value_min double DEFAULT NULL COMMENT ‘增长上限’,

value_max double DEFAULT NULL COMMENT ‘增长上限’,

notification_level int DEFAULT NULL COMMENT ‘警告级别’,

PRIMARY KEY (dt, tbl)

) ENGINE = InnoDB

DEFAULT CHARSET = utf8

comment ‘环比增长指标表’;

(6)创建同比增长指标表,week_on_week

CREATE TABLE data_supervisor.week_on_week

(

dt date NOT NULL COMMENT ‘日期’,

tbl varchar(50) NOT NULL COMMENT ‘表名’,

value double DEFAULT NULL COMMENT ‘同比增长百分比’,

value_min double DEFAULT NULL COMMENT ‘增长上限’,

value_max double DEFAULT NULL COMMENT ‘增长上限’,

notification_level int DEFAULT NULL COMMENT ‘警告级别’,

PRIMARY KEY (dt, tbl)

) ENGINE = InnoDB

DEFAULT CHARSET = utf8

comment ‘同比增长指标表’;

2.4 规则检测模块

2.4.1 单一规则检测脚本编写

检测规则脚本分为五类:分别是空id检查脚本、重复id检查脚本、值域检查脚本、数据量环比检查脚本和数据量同比检查脚本。

下面分别给大家介绍一下五类检测脚本的具体编写。

1.空id检查脚本

在Idea中创建一个文件null_id.sh,在文件中编写如下内容:

实现的主要功能是:计算空值个数,并将结果和自己定义的阈值上下限,插入到MySQL表中。

#!/usr/bin/env bash
# -*- coding: utf-8 -*-
# 检查id空值
# 解析参数
while getopts "t:d:c:s:x:l:" arg; do
  case $arg in
  # 要处理的表名
  t)
    TABLE=$OPTARG
    ;;
  # 日期
  d)
    DT=$OPTARG
    ;;
  # 要计算空值的列名
  c)
    COL=$OPTARG
    ;;
  # 空值指标下限
  s)
    MIN=$OPTARG
    ;;
  # 空值指标上限
  x)
    MAX=$OPTARG
    ;;
  # 告警级别
  l)
    LEVEL=$OPTARG
    ;;
  ?)
    echo "unkonw argument"
    exit 1
    ;;
  esac
done

#如果dt和level没有设置,那么默认值dt是昨天 告警级别是0

[ "D T " ] ∣ ∣ D T = DT" ] || DT=DT"]∣∣DT=(date -d ‘-1 day’ +%F)

[ “$LEVEL” ] || LEVEL=0

# 数仓DB名称

HIVE_DB=gmall

# 查询引擎

HIVE_ENGINE=hive

# MySQL相关配置

mysql_user=“root”

mysql_passwd=“000000”

mysql_host=“hadoop102”

mysql_DB=“data_supervisor”

mysql_tbl=“null_id”

# 认证为hive用户,如在非安全(Hadoop未启用Kerberos认证)环境中,则无需认证

kinit -kt /etc/security/keytab/hive.keytab hive

# 空值个数

RESULT=( ((HIVE_ENGINE -e “set hive.cli.print.header=false;select count(1) from H I V E D B . HIVE_DB.HIVEDB.TABLE where dt=‘$DT’ and $COL is null;”)

#结果插入MySQL

mysql -h"m y s q l h o s t " − u " mysql_host" -u"mysqlhost"u"mysql_user" -p"$mysql_passwd" \

-e"INSERT INTO m y s q l D B . mysql_DB.mysqlDB.mysql_tbl VALUES(‘D T ′ , ′ DT', 'DT,TABLE’, ‘$COL’, $RESULT, $MIN, $MAX, $LEVEL)

ON DUPLICATE KEY UPDATE `value`=R E S U L T , v a l u e m i n = RESULT, value_min=RESULT,valuemin=MIN, value_max=M A X , n o t i f i c a t i o n l e v e l = MAX, notification_level=MAX,notificationlevel=LEVEL;"

2.重复id检查脚本

在Idea中创建一个文件duplicate.sh,在文件中编写如下内容:

实现的主要功能是:计算重复值个数,并将结果和自己定义的阈值上下限,插入到MySQL表中。

#!/usr/bin/env bash
# -*- coding: utf-8 -*-
# 监控某张表一列的重复值
# 参数解析
while getopts "t:d:c:s:x:l:" arg; do
  case $arg in
  # 要处理的表名
  t)
    TABLE=$OPTARG
    ;;
  # 日期
  d)
    DT=$OPTARG
    ;;
  # 要计算重复值的列名
  c)
    COL=$OPTARG
    ;;
    # 重复值指标下限
  s)
    MIN=$OPTARG
    ;;
  # 重复值指标上限
  x)
    MAX=$OPTARG
    ;;
  # 告警级别
  l)
    LEVEL=$OPTARG
    ;;
  ?)
    echo "unkonw argument"
    exit 1
    ;;
  esac
done
#如果dt和level没有设置,那么默认值dt是昨天 告警级别是0
[ "$DT" ] || DT=$(date -d '-1 day' +%F)
[ "$LEVEL" ] || LEVEL=0
# 数仓DB名称
HIVE_DB=gmall
# 查询引擎
HIVE_ENGINE=hive
# MySQL相关配置
mysql_user="root"
mysql_passwd="000000"
mysql_host="hadoop102"
mysql_DB="data_supervisor"
mysql_tbl="duplicate"
# 认证为hive用户,如在非安全(Hadoop未启用Kerberos认证)环境中,则无需认证
kinit -kt /etc/security/keytab/hive.keytab hive
# 重复值个数
RESULT=$($HIVE_ENGINE -e "set hive.cli.print.header=false;select count(1) from (select $COL from $HIVE_DB.$TABLE where dt='$DT' group by $COL having count($COL)>1) t1;")
# 将结果插入MySQL
mysql -h"$mysql_host" -u"$mysql_user" -p"$mysql_passwd" \
  -e"INSERT INTO $mysql_DB.$mysql_tbl VALUES('$DT', '$TABLE', '$COL', $RESULT, $MIN, $MAX, $LEVEL)
ON DUPLICATE KEY UPDATE \`value\`=$RESULT, value_min=$MIN, value_max=$MAX, notification_level=$LEVEL;"

3. 值域检查脚本

在Idea中创建一个文件range.sh,在文件中编写如下内容:

实现的主要功能是:计算超出规定值域的值的个数,并将结果和自己定义的阈值上下限,插入到MySQL表中。

#!/usr/bin/env bash
# -*- coding: utf-8 -*-
# 计算某一列异常值个数
while getopts "t:d:l:c:s:x:a:b:" arg; do
  case $arg in
  # 要处理的表名
  t)
    TABLE=$OPTARG
    ;;
  # 日替
  d)
    DT=$OPTARG
    ;;
  # 要处理的列
  c)
    COL=$OPTARG
    ;;
  # 不在规定值域的值的个数下限
  s)
    MIN=$OPTARG
    ;;
  # 不在规定值域的值的个数上限
  x)
    MAX=$OPTARG
    ;;
  # 告警级别
  l)
    LEVEL=$OPTARG
    ;;
  # 规定值域为a-b
  a)
    RANGE_MIN=$OPTARG
    ;;
  b)
    RANGE_MAX=$OPTARG
    ;;
  ?)
    echo "unkonw argument"
    exit 1
    ;;
  esac
done
#如果dt和level没有设置,那么默认值dt是昨天 告警级别是0
[ "$DT" ] || DT=$(date -d '-1 day' +%F)
[ "$LEVEL" ] || LEVEL=0
# 数仓DB名称
HIVE_DB=gmall
# 查询引擎
HIVE_ENGINE=hive

4. 数据量环比检查脚本

在Idea中创建一个文件day_on_day.sh,在文件中编写如下内容:

实现的主要功能是:计算数据量环比增长值,并将结果和自己定义的阈值上下限,插入到MySQL表中。

#!/usr/bin/env bash
# -*- coding: utf-8 -*-
# 计算一张表单日数据量环比增长值
# 参数解析
while getopts "t:d:s:x:l:" arg; do
  case $arg in
  # 要处理的表名
  t)
    TABLE=$OPTARG
    ;;
  # 日期
  d)
    DT=$OPTARG
    ;;
  # 环比增长指标下限
  s)
    MIN=$OPTARG
    ;;
  # 环比增长指标上限
  x)
    MAX=$OPTARG
    ;;
  # 告警级别
  l)
    LEVEL=$OPTARG
    ;;
  ?)
    echo "unkonw argument"
    exit 1
    ;;
  esac
done
#如果dt和level没有设置,那么默认值dt是昨天 告警级别是0
[ "$DT" ] || DT=$(date -d '-1 day' +%F)
[ "$LEVEL" ] || LEVEL=0
# 数仓DB名称
HIVE_DB=gmall
# 查询引擎
HIVE_ENGINE=hive
# MySQL相关配置
mysql_user="root"
mysql_passwd="000000"
mysql_host="hadoop102"
mysql_DB="data_supervisor"
mysql_tbl="day_on_day"
# 认证为hive用户,如在非安全(Hadoop未启用Kerberos认证)环境中,则无需认证
kinit -kt /etc/security/keytab/hive.keytab hive
# 昨日数据量
YESTERDAY=$($HIVE_ENGINE -e "set hive.cli.print.header=false; select count(1) from $HIVE_DB.$TABLE where dt=date_add('$DT',-1);")
# 今日数据量
TODAY=$($HIVE_ENGINE -e "set hive.cli.print.header=false;select count(1) from $HIVE_DB.$TABLE where dt='$DT';")
# 计算环比增长值
if [ "$YESTERDAY" -ne 0 ]; then
  RESULT=$(awk "BEGIN{print ($TODAY-$YESTERDAY)/$YESTERDAY*100}")
else
  RESULT=10000
fi
# 将结果写入MySQL表格
mysql -h"$mysql_host" -u"$mysql_user" -p"$mysql_passwd" \
  -e"INSERT INTO $mysql_DB.$mysql_tbl VALUES('$DT', '$TABLE', $RESULT, $MIN, $MAX, $LEVEL)
ON DUPLICATE KEY UPDATE \`value\`=$RESULT, value_min=$MIN, value_max=$MAX, notification_level=$LEVEL;"

5. 数据量同比检查脚本

在Idea中创建一个文件week_on_week.sh,在文件中编写如下内容:

实现的主要功能是:计算数据量同比增长值,并将结果和自己定义的阈值上下限,插入到MySQL表中。

#!/usr/bin/env bash
# -*- coding: utf-8 -*-
# 计算一张表一周数据量同比增长值
# 参数解析
while getopts "t:d:s:x:l:" arg; do
  case $arg in
  # 要处理的表名
  t)
    TABLE=$OPTARG
    ;;
  # 日期
  d)
    DT=$OPTARG
    ;;
  # 同比增长指标下限
  s)
    MIN=$OPTARG
    ;;
  # 同比增长指标上限
  x)
    MAX=$OPTARG
    ;;
  # 告警级别
  l)
    LEVEL=$OPTARG
    ;;
  ?)
    echo "unkonw argument"
    exit 1
    ;;
  esac
done
#如果dt和level没有设置,那么默认值dt是昨天 告警级别是0
[ "$DT" ] || DT=$(date -d '-1 day' +%F)
[ "$LEVEL" ] || LEVEL=0
# 数仓DB名称
HIVE_DB=gmall
# 查询引擎
HIVE_ENGINE=hive
# MySQL相关配置
mysql_user="root"
mysql_passwd="000000"
mysql_host="hadoop102"
mysql_DB="data_supervisor"
mysql_tbl="week_on_week"
# 认证为hive用户,如在非安全(Hadoop未启用Kerberos认证)环境中,则无需认证
kinit -kt /etc/security/keytab/hive.keytab hive
# 上周数据量
LASTWEEK=$($HIVE_ENGINE -e "set hive.cli.print.header=false;select count(1) from $HIVE_DB.$TABLE where dt=date_add('$DT',-7);")
# 本周数据量
THISWEEK=$($HIVE_ENGINE -e "set hive.cli.print.header=false;select count(1) from $HIVE_DB.$TABLE where dt='$DT';")
# 计算增长
if [ $LASTWEEK -ne 0 ]; then
  RESULT=$(awk "BEGIN{print ($THISWEEK-$LASTWEEK)/$LASTWEEK*100}")
else
  RESULT=10000
fi
# 将结果写入MySQL
mysql -h"$mysql_host" -u"$mysql_user" -p"$mysql_passwd" \
  -e"INSERT INTO $mysql_DB.$mysql_tbl VALUES('$DT', '$TABLE', $RESULT, $MIN, $MAX, $LEVEL)
ON DUPLICATE KEY UPDATE \`value\`=$RESULT, value_min=$MIN, value_max=$MAX, notification_level=$LEVEL;"

2.4.2 数仓各层检测脚本编写

将上一节编写的单一规则检测脚本按照数仓分层进行集成,分别编写ODS层检测脚本,DWD层检测脚本和DIM层检测脚本。

每层详细集成步骤如下

1. ODS层

ODS层需要检查的指标如下表所示。

检查项目 依据 异常值下限 异常值上限
ods_order_info 同比增长 数据总量 -10% 10%
环比增长 数据总量 -10% 50%
值域检查 final_amount 0 100

在Idea中创建一个文件check_ods.sh,在文件中编写如下内容:

#!/usr/bin/env bash

DT=$1

[ "D T " ] ∣ ∣ D T = DT" ] || DT=DT"]∣∣DT=(date -d ‘-1 day’ +%F)

#检查表 ods_order_info 数据量日环比增长

#参数: -t 表名

# -d 日期

# -s 环比增长下限

# -x 环比增长上限

# -l 告警级别

bash day_on_day.sh -t ods_order_info -d “$DT” -s -10 -x 10 -l 1

#检查表 ods_order_info 数据量周同比增长

#参数: -t 表名

# -d 日期

# -s 同比增长下限

# -x 同比增长上限

# -l 告警级别

bash week_on_week.sh -t ods_order_info -d “$DT” -s -10 -x 50 -l 1

#检查表 ods_order_info 订单异常值

#参数: -t 表名

# -d 日期

# -s 指标下限

# -x 指标上限

# -l 告警级别

# -a 值域下限

# -b 值域上限

bash range.sh -t ods_order_info -d “$DT” -c final_amount -a 0 -b 100000 -s 0 -x 100 -l 1

2. DWD层

DWD层需要检查的项目下标所示。

检查项目 依据 异常值下限 异常值上限
dwd_order_info 空值检查 id 0 10
重复值检查 id 0 5

在Idea中创建一个文件check_dwd.sh,在文件中编写如下内容:

#!/usr/bin/env bash

DT=$1

[ "D T " ] ∣ ∣ D T = DT" ] || DT=DT"]∣∣DT=(date -d ‘-1 day’ +%F)

# 检查表 dwd_order_info 重复ID

#参数: -t 表名

# -d 日期

# -c 检查重复值的列

# -s 异常指标下限

# -x 异常指标上限

# -l 告警级别

bash duplicate.sh -t dwd_order_info -d “$DT” -c id -s 0 -x 5 -l 0

#检查表 dwd_order_info 的空ID

#参数: -t 表名

# -d 日期

# -c 检查空值的列

# -s 异常指标下限

# -x 异常指标上限

# -l 告警级别

bash null_id.sh -t dwd_order_info -d “$DT” -c id -s 0 -x 10 -l 0

3. DIM层

DIM层需要检查的项目如下表所示。

检查项目 依据 异常值下限 异常值上限
dim_user_info 空值检查 id 0 10
重复值检查 id 0 5

在Idea中创建一个文件check_dim.sh,在文件中编写如下内容:

#!/usr/bin/env bash

DT=$1

[ "D T " ] ∣ ∣ D T = DT" ] || DT=DT"]∣∣DT=(date -d ‘-1 day’ +%F)

#检查表 dim_user_info 的重复ID

#参数: -t 表名

# -d 日期

# -c 检查重复值的列

# -s 异常指标下限

# -x 异常指标上限

# -l 告警级别

bash duplicate.sh -t dim_user_info -d “$DT” -c id -s 0 -x 5 -l 0

#检查表 dim_user_info 的空ID

#参数: -t 表名

# -d 日期

# -c 检查空值的列

# -s 异常指标下限

# -x 异常指标上限

# -l 告警级别

bash null_id.sh -t dim_user_info -d “$DT” -c id -s 0 -x 10 -l 0

2.5 告警集成模块

该模块主要用于检查MySQL中的检测结果的异常,若有异常出现就发送警告。警告方式可选择邮件或者集成第三方告警平台睿象云。

(1)环境准备

在MySQL官网下载mysql-connector-python-2.1.7-1.el7.x86_64.rpm,下载地址如下:

https://repo.mysql.com/yum/mysql-connectors-community/el/7/x86_64/mysql-connector-python-2.1.7-1.el7.x86_64.rpm

将该rpm包上传至每台服务器,并安装:

[atguigu@hadoop102 ~]$ sudo rpm -i mysql-connector-python-2.1.7-1.el7.x86_64.rpm

(2)新建python脚本用于查询数据监控结果表格并发送告警邮件,该脚本主要由三个函数组成:

l read_table用于读取指标有问题的数据

l one_alert函数用于向睿象云发送告警

l mail_alert函数用于发送邮件告警

在Idea中创建一个文件check_notification.py,在文件中编写如下内容:

#!/usr/bin/env python
# -- coding: utf-8 --
import mysql.connector
import sys
import smtplib
from email.mime.text import MIMEText
from email.header import Header
import datetime
import urllib
import urllib2
import random
def get_yesterday():
“”"
:return: 前一天的日期
“”"
today = datetime.date.today()
one_day = datetime.timedelta(days=1)
yesterday = today - one_day
return str(yesterday)
def read_table(table, dt):
“”"
:param table:读取的表名
:param dt:读取的数据日期
:return:表中的异常数据(统计结果超出规定上下限的数据)
“”"
# mysql必要参数设置,需根据实际情况作出修改
mysql_user = “root”
mysql_password = “000000”
mysql_host = “hadoop102”
mysql_schema = “data_supervisor”
# 获取Mysql数据库连接
connect = mysql.connector.connect(user=mysql_user, password=mysql_password, host=mysql_host, database=mysql_schema)
cursor = connect.cursor()
# 查询表头
# [‘dt’, ‘tbl’, ‘col’, ‘value’, ‘value_min’, ‘value_max’, ‘notification_level’]
query = "desc " + table
cursor.execute(query)
head = map(lambda x: str(x[0]), cursor.fetchall())
# 查询异常数据(统计结果超出规定上下限的数据)
# [(datetime.date(2021, 7, 16), u’dim_user_info’, u’id’, 7, 0, 5, 1),
# (datetime.date(2021, 7, 16), u’dwd_order_id’, u’id’, 10, 0, 5, 1)]
query = (“select * from " + table + " where dt='” + dt + “’ and value not between value_min and value_max”)
cursor.execute(query)
cursor_fetchall = cursor.fetchall()
# 将指标和表头映射成为dict数组
#[{‘notification_level’: 1, ‘value_min’: 0, ‘value’: 7, ‘col’: u’id’, ‘tbl’: u’dim_user_info’, ‘dt’: datetime.date(2021, 7, 16), ‘value_max’: 5},
# {‘notification_level’: 1, ‘value_min’: 0, ‘value’: 10, ‘col’: u’id’, ‘tbl’: u’dwd_order_id’, ‘dt’: datetime.date(2021, 7, 16), ‘value_max’: 5}]
fetchall = map(lambda x: dict(x), map(lambda x: zip(head, x), cursor_fetchall))
return fetchall
def one_alert(line):
“”"
集成第三方告警平台睿象云,使用其提供的通知媒介发送告警信息
:param line: 一个等待通知的异常记录,{‘notification_level’: 1, ‘value_min’: 0, ‘value’: 7, ‘col’: u’id’, ‘tbl’: u’dim_user_info’, ‘dt’: datetime.date(2021, 7, 16), ‘value_max’: 5}
“”"
# 集成睿象云需要使用的rest接口,和APP KEY,须在睿象云平台获取
one_alert_key = “c2030c9a-7896-426f-bd64-59a8889ac8e3”
one_alert_host = “http://api.aiops.com/alert/api/event”
# 根据睿象云的rest api要求,传入必要的参数
data = {
 “app”: one_alert_key,
 “eventType”: “trigger”,
 “eventId”: str(random.randint(10000, 99999)),
 “alarmName”: “”.join([“表格”, str(line[“tbl”]), “数据异常.”]),
 “alarmContent”: “”.join([“指标”, str(line[“norm”]), “值为”, str(line[“value”]),
 “, 应为”, str(line[“value_min”]), “-”, str(line[“value_max”]),
 “, 参考信息:” + str(line[“col”]) if line.get(“col”) else “”]),
 “priority”: line[“notification_level”] + 1
}
# 使用urllib和urllib2向睿象云的rest结构发送请求,从而触发睿象云的通知策略
body = urllib.urlencode(data)
request = urllib2.Request(one_alert_host, body)
urlopen = urllib2.urlopen(request).read().decode(‘utf-8’)
print urlopen
def mail_alert(line):
“”"
使用电子邮件的方式发送告警信息
:param line: 一个等待通知的异常记录,{‘notification_level’: 1, ‘value_min’: 0, ‘value’: 7, ‘col’: u’id’, ‘tbl’: u’dim_user_info’, ‘dt’: datetime.date(2021, 7, 16), ‘value_max’: 5}
“”"
# smtp协议发送邮件的必要设置
mail_host = “smtp.126.com”
mail_user = “skiinder@126.com”
mail_pass = “KADEMQZWCPFWZETF”
# 告警内容
message = [“”.join([“表格”, str(line[“tbl”]), “数据异常.”]),
 “”.join([“指标”, str(line[“norm”]), “值为”, str(line[“value”]),
 “, 应为”, str(line[“value_min”]), “-”, str(line[“value_max”]),
 “, 参考信息:” + str(line[“col”]) if line.get(“col”) else “”])]
# 告警邮件,发件人
sender = mail_user
# 告警邮件,收件人
receivers = [mail_user]
# 将邮件内容转为html格式
mail_content = MIMEText(“”.join([“”, “
”.join(message), “”]), “html”, “utf-8”)
mail_content[“from”] = sender
mail_content[“to”] = receivers[0]
mail_content[“Subject”] = Header(message[0], “utf-8”)
# 使用smtplib发送邮件
try:
 smtp = smtplib.SMTP_SSL()
 smtp.connect(mail_host, 465)
 smtp.login(mail_user, mail_pass)
 content_as_string = mail_content.as_string()
 smtp.sendmail(sender, receivers, content_as_string)
except smtplib.SMTPException as e:
 print e
def main(argv):
“”"
:param argv: 系统参数,共三个,第一个为python脚本本身,第二个为告警方式,第三个为日期
“”"
# 如果没有传入日期参数,将日期定为昨天
if len(argv) >= 3:
 dt = argv[2]
else:
 dt = get_yesterday()
notification_level = 0
# 通过参数设置告警方式,默认是睿象云
alert = None
if len(argv) >= 2:
 alert = {
 “mail”: mail_alert,
 “one”: one_alert
 }[argv[1]]
if not alert:
 alert = one_alert
# 遍历所有表,查询所有错误内容,如果大于设定警告等级,就发送警告
for table in [“day_on_day”, “duplicate”, “null_id”, “rng”, “week_on_week”]:
 for line in read_table(table, dt):
 if line[“notification_level”] >= notification_level:
 line[“norm”] = table
 alert(line)
if name == “main”:
# 两个命令行参数
# 第一个为警告类型:one或者mail
# 第二个为日期,留空取昨天
main(sys.argv)

2.6 调度模块

该模块的主要功能为调度数据质量监控流程。数据质量监控工作流也采用Azkaban进行调度。数据质量监控工作流必定依赖数据仓库工作流,此处为了解耦,利用Azkaban API主动监视数据仓库工作流的执行状态,进而触发数据质量监控工作流。

以下是所有脚本内容:

1.Azkaban REST API 封装脚本

该脚本主要是对Azkaban API的封装,主要有三个方法:

l login函数可以登录Azkanban并返回session_id

l get_exec_id函数可以获取正在执行的工作流程的Execution ID

l wait_node可以等待指定Flow中某一结点执行完毕并判断其是否执行成功

在Idea中创建一个文件azclient.py,在文件中编写如下内容:

#!/usr/bin/env python
# -- coding: utf-8 --
import time
import urllib
import urllib2
import json
# Azkaban API 接口地址
az_url = “http://hadoop102:8081/”
# Azkaban用户名
az_username = “atguigu”
# Azkaban密码
az_password = “atguigu”
# 工程名称
project = “gmall”
# flow名称
flow = “gmall”
def post(url, data):
“”"
发送post请求到指定网址
:param url: 指定网址
:param data: 请求参数
:return: 请求结果
“”"
body = urllib.urlencode(data)
request = urllib2.Request(url, body)
urlopen = urllib2.urlopen(request).read().decode(‘utf-8’)
return json.loads(urlopen)
def get(url, data):
“”"
发送get请求到指定网址
:param url: 指定网址
:param data: 请求参数
:return: 请求结果
“”"
body = urllib.urlencode(data)
urlopen = urllib2.urlopen(url + body).read().decode(‘utf-8’)
return json.loads(urlopen)
def login():
“”"
使用AuthenticateAPI进行azkaban身份认证,获取session ID
:return: 返回session_id
“”"
data = {
 “action”: “login”,
 “username”: az_username,
 “password”: az_password
}
auth = post(az_url, data)
return str(auth.get(u"session.id"))
def get_exec_id(session_id):
“”"
使用Fetch Running Executions of a FlowAPI获取正在执行的Flow的ExecId
:param session_id: 和azkaban通讯的session_id
:param project: 项目名称
:param flow: 工作流名称
:return: 执行ID
“”"
data = {
 “session.id”: session_id,
 “ajax”: “getRunning”,
 “project”: project,
 “flow”: flow
}
execs = get(az_url + “executor?”, data).get(u"execIds")
if execs:
 return str(execs[0])
else:
 return None
def wait_node(session_id, exec_id, node_id):
“”"
循环使用Fetch a Flow ExecutionAPI获取指定Flow中的某个节点(job)的执行状态,直到其执行完成
:param session_id: 和azkaban通讯的session_id
:param exec_id: 执行ID
:param node_id: 指定节点(job)
:return: 该节点是否成功执行完毕
“”"
data = {
 “session.id”: session_id,
 “ajax”: “fetchexecflow”,
 “execid”: exec_id
}
status = None
# 若指定Flow中的指定Node(job)的执行状态是未完成的状态,就一直循环
while status not in [“SUCCEEDED”, “FAILED”, “CANCELLED”, “SKIPPED”, “KILLED”]:
 # 获取指定Flow的当前的执行信息
 flow_exec = get(az_url + “executor?”, data)
 # 从该Flow的执行信息中获取nodes字段的值,并遍历寻找特定的节点(job)信息,进而获取该节点(job)的状态
 for node in flow_exec.get(u"nodes"):
 if unicode(node_id) == node.get(u"id"):
 status = str(node.get(u"status"))
 print " ".join([node_id, status])
 # 等待1s,进入下一轮循环判断
 time.sleep(1)
return status == “SUCCEEDED”

2.ODS层调度脚本

该脚本用于检查ODS层数据质量。

在Idea中创建一个文件check_ods.py,在文件中编写如下内容:

#!/usr/bin/env python
# -- coding: utf-8 --
import sys
import os
from azclient import login,wait_node,get_exec_id
from check_notification import get_yesterday
def check_ods(dt, session_id, exec_id):
“”"
检查ODS层数据质量
:param dt: 日期
:param session_id: 和azkaban通讯的session_id
:param exec_id: 指定的执行ID
:return: None
“”"
if wait_node(session_id, exec_id, “hdfs_to_ods_db”) and wait_node(session_id, exec_id, “hdfs_to_ods_log”):
 os.system("bash check_ods.sh " + dt)
if name == ‘main’:
argv = sys.argv
# 获取session_id
session_id = login()
# 获取执行ID。只有在原Flow正在执行时才能获取
exec_id = get_exec_id(session_id)
# 获取日期,如果不存在取昨天
if len(argv) >= 2:
 dt = argv[1]
else:
 dt = get_yesterday()
# 检查各层数据质量
if exec_id:
 check_ods(dt, session_id, exec_id)

3.DWD层调度脚本

该脚本用于检查DWD层数据质量。

在Idea中创建一个文件check_dwd.py,在文件中编写如下内容:

#!/usr/bin/env python
# -- coding: utf-8 --
import sys
import os
from azclient import login, wait_node, get_exec_id
from check_notification import get_yesterday
def check_dwd(dt, session_id, exec_id):
“”"
检查DWD层数据质量
:param dt: 日期
:param session_id: 和azkaban通讯的session_id
:param exec_id: 指定的执行ID
:return: None
“”"
if wait_node(session_id, exec_id, “ods_to_dwd_db”) and wait_node(session_id, exec_id, “ods_to_dwd_log”):
 os.system("bash check_dwd.sh " + dt)
if name == ‘main’:
argv = sys.argv
# 获取session_id
session_id = login()
# 获取执行ID。只有在原Flow正在执行时才能获取
exec_id = get_exec_id(session_id)
# 获取日期,如果不存在取昨天
if len(argv) >= 2:
 dt = argv[1]
else:
 dt = get_yesterday()
# 检查各层数据质量
if exec_id:
 check_dwd(dt, session_id, exec_id)

4.DIM层调度脚本

该脚本用于检查DIM层数据质量。

在Idea中创建一个文件check_dim.py,在文件中编写如下内容:

#!/usr/bin/env python
# -- coding: utf-8 --
import sys
import os
from azclient import login, wait_node, get_exec_id
from check_notification import get_yesterday
def check_dim(dt, session_id, exec_id):
“”"
检查DIM层数据质量
:param dt: 日期
:param session_id: 和azkaban通讯的session_id
:param exec_id: 指定的执行ID
:return: None
“”"
if wait_node(session_id, exec_id, “ods_to_dim_db”):
 os.system("bash check_dim.sh " + dt)
if name == ‘main’:
argv = sys.argv
# 获取session_id
session_id = login()
# 获取执行ID。只有在原Flow正在执行时才能获取
exec_id = get_exec_id(session_id)
# 获取日期,如果不存在取昨天
if len(argv) >= 2:
 dt = argv[1]
else:
 dt = get_yesterday()
# 检查各层数据质量
if exec_id:
 check_dim(dt, session_id, exec_id)

5.Azkaban工作流配置文件

(1)在Idea中创建一个文件azkaban.project,在文件中编写如下内容:

azkaban-flow-version: 2.0

(2)在Idea中创建一个文件data_supervisor.flow,在文件中编写如下内容:

nodes:
- name: check_ods
type: command
config:
command: python check_ods.py ${dt}
- name: check_dwd
type: command
config:
command: python check_dwd.py ${dt}
- name: check_dim
type: command
config:
command: python check_dim.py ${dt}
- name: check_notification
type: command
dependsOn:
 - check_ods
 - check_dwd
 - check_dim
config:
command: python check_notification.py ${alert} ${dt}

(3)将所有文件打包成data_supervisor.zip文件

(4)在Azkaban框架中新建项目并上传该文件,可看到如下图所示工作流。

(5)先启动数仓工作流,在执行过程中,启动质量监控工作流,并传入如下参数

等待任务执行完毕,观察邮箱是否有告警邮件

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
18小时前
|
机器学习/深度学习 人工智能 运维
智能化运维:AI在IT管理中的应用与挑战
随着人工智能(AI)技术的不断进步,其在信息技术(IT)运维领域的应用日益广泛。从自动化故障检测到智能决策支持系统,AI技术正逐步改变着传统运维的面貌。本文将探讨AI在IT运维中的具体应用场景,分析其带来的效率提升和成本节约,同时指出实施过程中可能遇到的技术和管理上的挑战,并提出相应的解决策略。通过深入分析,本文旨在为IT管理者提供一份关于如何有效整合AI技术以优化运维实践的参考指南。
|
1天前
|
机器学习/深度学习 运维 监控
智能化运维:利用机器学习优化IT基础设施管理
【7月更文挑战第7天】在数字化时代,IT基础设施的复杂性不断增加,传统的运维方法难以应对日益增长的挑战。本文探讨了如何通过机器学习技术来提升运维效率,实现智能化管理。我们将分析机器学习在自动化故障检测、预测性维护和资源优化中的应用实例,并讨论实施这些技术时面临的挑战与解决策略。
|
1天前
|
机器学习/深度学习 人工智能 运维
智能运维:利用机器学习优化IT基础设施管理
【7月更文挑战第7天】在数字化浪潮不断推进的今天,企业对IT基础设施的管理要求越来越高。传统的运维模式已难以满足现代企业的需求,智能运维(AIOps)应运而生。本文将探讨如何通过机器学习技术来优化IT基础设施的管理,提高故障预测的准确性,自动化日常任务,并实现个性化的报警系统,从而提升运维效率和服务质量。
|
3天前
|
机器学习/深度学习 运维 算法
智能化运维:利用机器学习优化IT基础设施管理
在信息技术飞速发展的今天,传统的运维模式已经难以满足现代企业的需求。本文将探讨如何通过引入机器学习技术,实现智能化运维,从而优化IT基础设施的管理效率和响应速度。我们将从机器学习的基础概念出发,逐步深入到其在运维领域的应用实例,最后讨论实施智能化运维可能面临的挑战及解决策略。
|
3天前
|
运维 监控 大数据
部署-Linux01,后端开发,运维开发,大数据开发,测试开发,后端软件,大数据系统,运维监控,测试程序,网页服务都要在Linux中进行部署
部署-Linux01,后端开发,运维开发,大数据开发,测试开发,后端软件,大数据系统,运维监控,测试程序,网页服务都要在Linux中进行部署
|
7天前
|
机器学习/深度学习 人工智能 运维
智能运维:利用人工智能优化IT基础设施管理
【6月更文挑战第30天】随着企业对信息技术的依赖性不断增强,传统的运维管理方法已无法满足现代业务的需求。智能运维(AIOps)作为一种新兴的运维模式,通过集成大数据、机器学习和自动化技术,旨在提高运维效率,减少系统故障时间,并提升用户体验。本文将探讨智能运维的核心概念、实施步骤及其对企业IT基础设施管理的积极影响,同时也会讨论在实际应用中可能遇到的挑战与解决方案。
23 2
|
9天前
|
机器学习/深度学习 运维 算法
智能运维的崛起:机器学习在IT管理中的应用与挑战
随着企业对信息技术依赖程度的不断加深,传统的运维模式已经难以满足现代业务的需求。本文将深入探讨如何通过机器学习技术提升运维效率,分析其在故障预测、自动化处理和安全防护等方面的应用,并讨论实施过程中可能遇到的技术与管理挑战。文章旨在为IT专业人士提供一种前瞻性的视角,以适应日益复杂的运维环境。
13 0
|
9天前
|
机器学习/深度学习 人工智能 运维
智能化运维:AI在IT管理中的应用与挑战
【6月更文挑战第28天】随着人工智能技术的飞速发展,其在IT运维领域的应用逐渐深入。本文将探讨AI技术在智能化运维中的角色,包括自动化故障诊断、预测性维护、以及安全监控等方面。同时,我们也将分析实施智能化运维时面临的技术挑战和伦理问题,旨在为读者提供一个关于如何有效整合AI技术进入IT运维实践的全面视角。
|
10天前
|
机器学习/深度学习 运维 监控
智能化运维:利用机器学习优化IT基础设施管理
随着信息技术的飞速发展,企业和组织越来越依赖于高效、可靠的IT基础设施。然而,传统的运维方法往往无法满足现代业务需求的速度和规模。本文将探讨如何通过机器学习技术来优化IT基础设施的管理,提高运维效率,降低风险,并确保系统的高可用性。我们将分析机器学习在自动化故障检测、预测性维护、资源分配和安全监控方面的应用,以及这些技术如何帮助运维团队更好地理解和优化他们的IT环境。
|
10天前
|
机器学习/深度学习 人工智能 运维
智能化运维:AI在IT管理中的革新作用
随着人工智能(AI)技术的飞速发展,其在信息技术(IT)运维领域的应用正逐渐成熟,并开始引领一场革命。本文将探讨AI技术如何优化传统的IT运维流程,提高效率与响应速度,并预测未来运维的发展方向。通过分析实际案例和最新的研究成果,本文旨在为读者提供一个关于AI在现代IT运维中角色和影响的全面视角。