基于Spark SQL实现对HDFS操作的实时监控报警

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: E-MapReduce计划从EMR-3.18.1版本开始提供Spark Streaming SQL的预览版功能。Spark Streaming SQL是在Spark Structured Streaming的基础上做了进一步封装,方便用户使用SQL语言进行Spark流式分析开发。

1.前言

E-MapReduce计划从EMR-3.18.1版本开始提供Spark Streaming SQL的预览版功能。Spark Streaming SQL是在Spark Structured Streaming的基础上做了进一步封装,方便用户使用SQL语言进行Spark流式分析开发。Spark Streaming SQL直接地透明地受惠于Spark SQL的优化带来的性能提升,同时也遵循Spark Structured Streaming的语法约束,例如Spark Structured Streaming不支持多流聚合查询,Spark Streaming SQL也就同样不支持。关于EMR Spark Streaming SQL的使用入门,请参考:

2.案例介绍

本文将使用Spark Streaming SQL实现对HDFS audilog的实时统计分析。数据流图如下所示:
image

一共分为4步:

  • 采集HDFS的audit-log到LogService中。
  • Spark Streaming SQL从LogService消费数据并进行分析。
  • 分析结果写到到存储系统中。
  • 下游系统从存储系统中查询和使用分析结果,典型的下游系统有报表或者监控报警。

本文专注于第二步,即如何使用Spark Streaming SQL分析LogService数据。

3.案例实现

本次演示中,我们将结果数据写到Kafka中。

3.1 环境准备

在E-MapReduce中创建两个集群:Hadoop集群和Kafka集群。这里就不详细演示如何创建EMR集群了,不熟悉的可以参考帮助文档

3.2 创建数据表

  • 创建loghub数据源表
CREATE DATABASE IF NOT EXISTS default;
USE default;
DROP TABLE IF EXISTS loghub_hdfs_auditlog;

CREATE TABLE loghub_hdfs_auditlog
USING loghub
OPTIONS (
sls.project = "${SLS_PROJECT_NAME}",
sls.store = "${SLS_STORE_NAME}",
access.key.id = "${ACCESS_KEY_ID}",
access.key.secret = "${ACCESS_KEY_SECRET}",
endpoint = "${SLS_ENDPOINT}",
zookeeper.connect.address = "${ZOOKEEPER_ADDRESS}");
  • 创建Kafka输出表
CREATE DATABASE IF NOT EXISTS default;
USE default;
DROP TABLE IF EXISTS kafka_hdfs_auditlog_analysis;

CREATE TABLE kafka_hdfs_auditlog_analysis
USING kafka
OPTIONS (
kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",
subscribe = 'temp_hdfs_auditlog_analysis',
output.mode = 'complete',
kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",
kafka.schema.record.name = "${SCHEMA_RECORD_NAME}",
kafka.schema.record.namespace = "${SCHEMA_RECORD_NAMESPACE}");

注意:

  1. 具体做查询分析时,需要单独创建一个Kafka结果表。这是因为每个查询结果不一样,输出表的schema也就不一样。
  2. 根据具体的查询结果schema,我们需要预先在Kafka Schema Registry中注册好Kafka Topic Schema信息。
  • Schema定义示例:
{"namespace": "org.apache.spark.emr.baseline.testing",
 "type": "record",
 "name": "TempResult",
 "fields": [
    {"name": "avg_ss_quantity", "type": ["double", "null"]},
    {"name": "avg_ss_ext_sales_price", "type": [{"type": "bytes", "logicalType": "decimal", "precision": 11, "scale": 6}, "null"]},
    {"name": "avg_ss_ext_wholesale_cost", "type": [{"type": "bytes", "logicalType": "decimal", "precision": 11, "scale": 6}, "null"]},
    {"name": "sum_ss_ext_wholesale_cost", "type": [{"type": "bytes", "logicalType": "decimal", "precision": 17, "scale": 2}, "null"]}
 ]
}

将schema定义保存到文件中,并使用脚本工具注册schema到Kafka Schema Registry中。

python ./schema_register.py ${SCHEMA_REGISTRY_URL} ${TOPIC_NAME} ${SCHEMA_FILE}

3.3 统计5分钟内各个操作次数

  • 注册Kafka Topic Schema:
{"type":"record",
 "name":"TempResult",
 "namespace":"org.apache.spark.sql.streaming.test",
 "fields":[
    {"name":"op","type":["string","null"]},
    {"name":"count","type":"long"},
    {"name":"window_time","type":
        {"type":"record",
         "name":"window_time",
         "namespace":"org.apache.spark.sql.streaming.test.window_time",
         "fields":[
            {"name":"start","type":[{"type":"long","logicalType":"timestamp-millis"},"null"]},
            {"name":"end","type":[{"type":"long","logicalType":"timestamp-millis"},"null"]}
         ]
        }
    }
  ]
}
  • 创建一个结果表,命名为:“kafka_hdfs_auditlog_op_count_in_5_mins”。
DROP TABLE IF EXISTS kafka_hdfs_auditlog_op_count_in_5_mins;

CREATE TABLE kafka_hdfs_auditlog_op_count_in_5_mins
USING kafka
OPTIONS (
kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",
subscribe = "kafka_hdfs_auditlog_op_count_in_5_mins",
output.mode = "append",
kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",
kafka.schema.record.name = "TempResult",
kafka.schema.record.namespace = "org.apache.spark.sql.streaming.test");
  • 查询分析
SET streaming.query.name=hdfs_auditlog_op_count_in_5_mins;
SET spark.sql.streaming.checkpointLocation.hdfs_auditlog_op_count_in_5_mins=/tmp/spark/sql/streaming/hdfs_auditlog_op_count_in_5_mins;

INSERT INTO kafka_hdfs_auditlog_op_count_in_5_mins
SELECT cmd op, count(*) count, window window_time
FROM loghub_hdfs_auditlog
GROUP BY TUMBLING(__time__, interval 5 minute), op
HAVING delay(__time__) < '5 minutes';

注:

  1. “streaming.query.name”可以为任意名字。必填配置。
  2. “spark.sql.streaming.checkpointLocation.${streaming.query.name}”对“streaming.query.name”的job配置checkpoint路径,可以为任意hdfs路径。必填配置。
  • 查看结果
通过Kafka命令行,查看kafka_hdfs_auditlog_op_count_in_5_mins topic数据:

窗口: 1550493600000 ~ 1550493900000
{"op":{"string":"create"},"count":47438,"window_time":{"start":{"long":1550493600000},"end":{"long":1550493900000}}}
{"op":{"string":"delete"},"count":181197,"window_time":{"start":{"long":1550493600000},"end":{"long":1550493900000}}}
{"op":{"string":"getfileinfo"},"count":265451,"window_time":{"start":{"long":1550493600000},"end":{"long":1550493900000}}}
{"op":{"string":"listStatus"},"count":641205,"window_time":{"start":{"long":1550493600000},"end":{"long":1550493900000}}}
{"op":{"string":"mkdirs"},"count":12171,"window_time":{"start":{"long":1550493600000},"end":{"long":1550493900000}}}
{"op":{"string":"append"},"count":30981,"window_time":{"start":{"long":1550493600000},"end":{"long":1550493900000}}}
{"op":{"string":"rename"},"count":169709,"window_time":{"start":{"long":1550493600000},"end":{"long":1550493900000}}}
{"op":{"string":"setPermission"},"count":52164,"window_time":{"start":{"long":1550493600000},"end":{"long":1550493900000}}}
{"op":{"string":"open"},"count":436877,"window_time":{"start":{"long":1550493600000},"end":{"long":1550493900000}}}

3.4 统计1分钟内各个来源IP的open操作次数

  • 注册Kafka Topic Schema:
{"type":"record",
 "name":"TempResult",
 "namespace":"org.apache.spark.sql.streaming.test",
 "fields":[
    {"name":"ip","type":["string","null"]},
    {"name":"count","type":"long"},
    {"name":"window_time","type":
        {"type":"record",
         "name":"window_time",
         "namespace":"org.apache.spark.sql.streaming.test.window_time",
         "fields":[
            {"name":"start","type":[{"type":"long","logicalType":"timestamp-millis"},"null"]},
            {"name":"end","type":[{"type":"long","logicalType":"timestamp-millis"},"null"]}
         ]
        }
    }
  ]
}
  • 创建一个结果表,命名为:“kafka_hdfs_auditlog_read_count_per_ip_in_1_mins”。
DROP TABLE IF EXISTS kafka_hdfs_auditlog_read_count_per_ip_in_1_mins;

CREATE TABLE kafka_hdfs_auditlog_read_count_per_ip_in_1_mins
USING kafka
OPTIONS (
kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",
subscribe = "kafka_hdfs_auditlog_read_count_per_ip_in_1_mins",
output.mode = "append",
kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",
kafka.schema.record.name = "TempResult",
kafka.schema.record.namespace = "org.apache.spark.sql.streaming.test");
  • 查询分析
SET streaming.query.name=hdfs_auditlog_read_count_per_ip_in_1_mins;
SET spark.sql.streaming.checkpointLocation.hdfs_auditlog_read_count_per_ip_in_1_mins=/tmp/spark/sql/streaming/hdfs_auditlog_read_count_per_ip_in_1_mins;

INSERT INTO kafka_hdfs_auditlog_read_count_per_ip_in_1_mins
SELECT ip, count(*) count, window window_time
FROM loghub_hdfs_auditlog
where cmd='open'
GROUP BY TUMBLING(__time__, interval 1 minute), ip
HAVING delay(__time__) < '1 minutes';
  • 查看结果
窗口: 1550540940000 ~ 1550541000000
{"ip":{"string":"172.*.*.130"},"count":69090,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}
{"ip":{"string":"172.*.*.1"},"count":69266,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}
{"ip":{"string":"10.*.*.1"},"count":5129,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}
{"ip":{"string":"172.*.*.52"},"count":70469,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}
{"ip":{"string":"10.*.*.24"},"count":7206,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}
{"ip":{"string":"172.*.*.48"},"count":136101,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}
{"ip":{"string":"172.*.*.19"},"count":67226,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}
{"ip":{"string":"172.*.*.82"},"count":141886,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}
{"ip":{"string":"172.*.*.50"},"count":69165,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}
{"ip":{"string":"172.*.*.54"},"count":151539,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}

窗口: 1550541000000 ~ 1550541060000
{"ip":{"string":"192.*.*.22"},"count":77776,"window_time":{"start":{"long":1550541000000},"end":{"long":1550541060000}}}
{"ip":{"string":"10.*.*.111"},"count":9373,"window_time":{"start":{"long":1550541000000},"end":{"long":1550541060000}}}
{"ip":{"string":"10.*.*.2"},"count":9329,"window_time":{"start":{"long":1550541000000},"end":{"long":1550541060000}}}

窗口: 1550541060000 ~ 1550541120000
{"ip":{"string":"172.*.*.207"},"count":10481,"window_time":{"start":{"long":1550541060000},"end":{"long":1550541120000}}}
{"ip":{"string":"192.*.*.138"},"count":28965,"window_time":{"start":{"long":1550541060000},"end":{"long":1550541120000}}}
{"ip":{"string":"10.*.*.160"},"count":22015,"window_time":{"start":{"long":1550541060000},"end":{"long":1550541120000}}}
{"ip":{"string":"172.*.*.234"},"count":32892,"window_time":{"start":{"long":1550541060000},"end":{"long":1550541120000}}}

3.5 统计1分钟内QPS超过100的来源IP

  • 注册Kafka Topic Schema:
{"type":"record",
 "name":"TempResult",
 "namespace":"org.apache.spark.sql.streaming.test",
 "fields":[
    {"name":"ip","type":["string","null"]},
    {"name":"qps","type":["double","null"]},
    {"name":"window_time","type":
        {"type":"record",
         "name":"window_time",
         "namespace":"org.apache.spark.sql.streaming.test.window_time",
         "fields":[
            {"name":"start","type":[{"type":"long","logicalType":"timestamp-millis"},"null"]},
            {"name":"end","type":[{"type":"long","logicalType":"timestamp-millis"},"null"]}
         ]
        }
    }
  ]
}
  • 创建一个结果表,命名为:“kafka_hdfs_auditlog_qps_gt_100_ip_in_1_mins”。
DROP TABLE IF EXISTS kafka_hdfs_auditlog_qps_gt_100_ip_in_1_mins;

CREATE TABLE kafka_hdfs_auditlog_qps_gt_100_ip_in_1_mins
USING kafka
OPTIONS (
kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",
subscribe = "kafka_hdfs_auditlog_qps_gt_100_ip_in_1_mins",
output.mode = "append",
kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",
kafka.schema.record.name = "TempResult",
kafka.schema.record.namespace = "org.apache.spark.sql.streaming.test");
  • 查询分析
SET streaming.query.name=hdfs_auditlog_qps_gt_100_ip_in_1_mins;
SET spark.sql.streaming.checkpointLocation.hdfs_auditlog_qps_gt_100_ip_in_1_mins=/tmp/spark/sql/streaming/hdfs_auditlog_qps_gt_100_ip_in_1_mins;

INSERT INTO kafka_hdfs_auditlog_qps_gt_100_ip_in_1_mins
SELECT ip, qps, window window_time
FROM 
  (SELECT ip, count(cmd)/60 qps, window
  FROM loghub_hdfs_auditlog
  GROUP BY TUMBLING(__time__, interval 1 minute), ip
  HAVING delay(__time__) < '1 minutes') t
WHERE qps > 100;
  • 查看结果
窗口:1550543160000 ~ 1550543220000
{"ip":{"string":"10.*.*.140"},"qps":{"double":328.6333333333333},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"172.*.*.13"},"qps":{"double":276.43333333333334},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"172.*.*.27"},"qps":{"double":228.23333333333332},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"192.*.*.170"},"qps":{"double":407.4},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"172.*.*.95"},"qps":{"double":233.6},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"172.*.*.8"},"qps":{"double":341.3833333333333},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"192.*.*.167"},"qps":{"double":357.73333333333335},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"10.*.*.142"},"qps":{"double":283.05},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"192.*.*.98"},"qps":{"double":371.46666666666664},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"192.*.*.209"},"qps":{"double":501.35},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"10.*.*.12"},"qps":{"double":276.8333333333333},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"10.*.*.98"},"qps":{"double":276.3833333333333},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"172.*.*.169"},"qps":{"double":231.86666666666667},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"10.*.*.166"},"qps":{"double":358.9},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}

窗口:1550543220000 ~ 1550543280000
{"ip":{"string":"172.*.*.41"},"qps":{"double":675.3},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
{"ip":{"string":"192.*.*.97"},"qps":{"double":364.45},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
{"ip":{"string":"172.*.*.17"},"qps":{"double":392.98333333333335},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
{"ip":{"string":"192.*.*.234"},"qps":{"double":361.3833333333333},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
{"ip":{"string":"192.*.*.72"},"qps":{"double":354.98333333333335},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
{"ip":{"string":"172.*.*.3"},"qps":{"double":513.2833333333333},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
{"ip":{"string":"10.*.*.1"},"qps":{"double":435.18333333333334},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
{"ip":{"string":"10.*.*.240"},"qps":{"double":458.45},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
{"ip":{"string":"10.*.*.85"},"qps":{"double":500.46666666666664},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
{"ip":{"string":"172.*.*.234"},"qps":{"double":635.1333333333333},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
{"ip":{"string":"172.*.*.107"},"qps":{"double":371.76666666666665},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
{"ip":{"string":"10.*.*.195"},"qps":{"double":357.46666666666664},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
{"ip":{"string":"10.*.*.31"},"qps":{"double":178.95},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}

4. 小结

本文简要示例了如何在EMR上使用Spark Streaming SQL进行流式分析。需要注意的是,EMR Spark Streaming SQL处于预览版阶段,语法和功能还在不断丰富完善中。

目录
相关文章
|
1月前
|
分布式计算 Kubernetes Hadoop
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
151 6
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
86 0
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
37 0
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
46 0
|
3月前
|
存储 分布式计算 资源调度
Hadoop生态系统概览:从HDFS到Spark
【8月更文第28天】Hadoop是一个开源软件框架,用于分布式存储和处理大规模数据集。它由多个组件构成,旨在提供高可靠性、高可扩展性和成本效益的数据处理解决方案。本文将介绍Hadoop的核心组件,包括HDFS、MapReduce、YARN,并探讨它们如何与现代大数据处理工具如Spark集成。
247 0
|
4月前
|
SQL 分布式计算 MaxCompute
SQL开发问题之对于ODPS中的UNION操作,执行计划的问题如何解决
SQL开发问题之对于ODPS中的UNION操作,执行计划的问题如何解决
|
4月前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之 Spark Local模式启动报错,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
4月前
|
SQL 分布式计算 数据处理
MaxCompute操作报错合集之使用Spark查询时函数找不到的原因是什么
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
4月前
|
SQL 分布式计算 DataWorks
MaxCompute操作报错合集之使用sql查询一个表的分区数据时遇到报错,该如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
4月前
|
SQL 分布式计算 资源调度
MaxCompute操作报错合集之执行SQL Union All操作时,数据类型产生报错,该怎么解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
132 1