1.前言
E-MapReduce计划从EMR-3.18.1版本开始提供Spark Streaming SQL的预览版功能。Spark Streaming SQL是在Spark Structured Streaming的基础上做了进一步封装,方便用户使用SQL语言进行Spark流式分析开发。Spark Streaming SQL直接地透明地受惠于Spark SQL的优化带来的性能提升,同时也遵循Spark Structured Streaming的语法约束,例如Spark Structured Streaming不支持多流聚合查询,Spark Streaming SQL也就同样不支持。关于EMR Spark Streaming SQL的使用入门,请参考:
- EMR Spark Streaming SQL语法参考
- Spark Streaming SQL Baseline Testing,验证和测试工具,可以作为开发入门使用
2.案例介绍
本文将使用Spark Streaming SQL实现对HDFS audilog的实时统计分析。数据流图如下所示:
一共分为4步:
- 采集HDFS的audit-log到LogService中。
- Spark Streaming SQL从LogService消费数据并进行分析。
- 分析结果写到到存储系统中。
- 下游系统从存储系统中查询和使用分析结果,典型的下游系统有报表或者监控报警。
本文专注于第二步,即如何使用Spark Streaming SQL分析LogService数据。
3.案例实现
本次演示中,我们将结果数据写到Kafka中。
3.1 环境准备
在E-MapReduce中创建两个集群:Hadoop集群和Kafka集群。这里就不详细演示如何创建EMR集群了,不熟悉的可以参考帮助文档。
3.2 创建数据表
- 创建loghub数据源表
CREATE DATABASE IF NOT EXISTS default;
USE default;
DROP TABLE IF EXISTS loghub_hdfs_auditlog;
CREATE TABLE loghub_hdfs_auditlog
USING loghub
OPTIONS (
sls.project = "${SLS_PROJECT_NAME}",
sls.store = "${SLS_STORE_NAME}",
access.key.id = "${ACCESS_KEY_ID}",
access.key.secret = "${ACCESS_KEY_SECRET}",
endpoint = "${SLS_ENDPOINT}",
zookeeper.connect.address = "${ZOOKEEPER_ADDRESS}");
- 创建Kafka输出表
CREATE DATABASE IF NOT EXISTS default;
USE default;
DROP TABLE IF EXISTS kafka_hdfs_auditlog_analysis;
CREATE TABLE kafka_hdfs_auditlog_analysis
USING kafka
OPTIONS (
kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",
subscribe = 'temp_hdfs_auditlog_analysis',
output.mode = 'complete',
kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",
kafka.schema.record.name = "${SCHEMA_RECORD_NAME}",
kafka.schema.record.namespace = "${SCHEMA_RECORD_NAMESPACE}");
注意:
- 具体做查询分析时,需要单独创建一个Kafka结果表。这是因为每个查询结果不一样,输出表的schema也就不一样。
- 根据具体的查询结果schema,我们需要预先在Kafka Schema Registry中注册好Kafka Topic Schema信息。
- Schema定义示例:
{"namespace": "org.apache.spark.emr.baseline.testing",
"type": "record",
"name": "TempResult",
"fields": [
{"name": "avg_ss_quantity", "type": ["double", "null"]},
{"name": "avg_ss_ext_sales_price", "type": [{"type": "bytes", "logicalType": "decimal", "precision": 11, "scale": 6}, "null"]},
{"name": "avg_ss_ext_wholesale_cost", "type": [{"type": "bytes", "logicalType": "decimal", "precision": 11, "scale": 6}, "null"]},
{"name": "sum_ss_ext_wholesale_cost", "type": [{"type": "bytes", "logicalType": "decimal", "precision": 17, "scale": 2}, "null"]}
]
}
将schema定义保存到文件中,并使用脚本工具注册schema到Kafka Schema Registry中。
python ./schema_register.py ${SCHEMA_REGISTRY_URL} ${TOPIC_NAME} ${SCHEMA_FILE}
3.3 统计5分钟内各个操作次数
- 注册Kafka Topic Schema:
{"type":"record",
"name":"TempResult",
"namespace":"org.apache.spark.sql.streaming.test",
"fields":[
{"name":"op","type":["string","null"]},
{"name":"count","type":"long"},
{"name":"window_time","type":
{"type":"record",
"name":"window_time",
"namespace":"org.apache.spark.sql.streaming.test.window_time",
"fields":[
{"name":"start","type":[{"type":"long","logicalType":"timestamp-millis"},"null"]},
{"name":"end","type":[{"type":"long","logicalType":"timestamp-millis"},"null"]}
]
}
}
]
}
- 创建一个结果表,命名为:“kafka_hdfs_auditlog_op_count_in_5_mins”。
DROP TABLE IF EXISTS kafka_hdfs_auditlog_op_count_in_5_mins;
CREATE TABLE kafka_hdfs_auditlog_op_count_in_5_mins
USING kafka
OPTIONS (
kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",
subscribe = "kafka_hdfs_auditlog_op_count_in_5_mins",
output.mode = "append",
kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",
kafka.schema.record.name = "TempResult",
kafka.schema.record.namespace = "org.apache.spark.sql.streaming.test");
- 查询分析
SET streaming.query.name=hdfs_auditlog_op_count_in_5_mins;
SET spark.sql.streaming.checkpointLocation.hdfs_auditlog_op_count_in_5_mins=/tmp/spark/sql/streaming/hdfs_auditlog_op_count_in_5_mins;
INSERT INTO kafka_hdfs_auditlog_op_count_in_5_mins
SELECT cmd op, count(*) count, window window_time
FROM loghub_hdfs_auditlog
GROUP BY TUMBLING(__time__, interval 5 minute), op
HAVING delay(__time__) < '5 minutes';
注:
- “streaming.query.name”可以为任意名字。必填配置。
- “spark.sql.streaming.checkpointLocation.${streaming.query.name}”对“streaming.query.name”的job配置checkpoint路径,可以为任意hdfs路径。必填配置。
- 查看结果
通过Kafka命令行,查看kafka_hdfs_auditlog_op_count_in_5_mins topic数据:
窗口: 1550493600000 ~ 1550493900000
{"op":{"string":"create"},"count":47438,"window_time":{"start":{"long":1550493600000},"end":{"long":1550493900000}}}
{"op":{"string":"delete"},"count":181197,"window_time":{"start":{"long":1550493600000},"end":{"long":1550493900000}}}
{"op":{"string":"getfileinfo"},"count":265451,"window_time":{"start":{"long":1550493600000},"end":{"long":1550493900000}}}
{"op":{"string":"listStatus"},"count":641205,"window_time":{"start":{"long":1550493600000},"end":{"long":1550493900000}}}
{"op":{"string":"mkdirs"},"count":12171,"window_time":{"start":{"long":1550493600000},"end":{"long":1550493900000}}}
{"op":{"string":"append"},"count":30981,"window_time":{"start":{"long":1550493600000},"end":{"long":1550493900000}}}
{"op":{"string":"rename"},"count":169709,"window_time":{"start":{"long":1550493600000},"end":{"long":1550493900000}}}
{"op":{"string":"setPermission"},"count":52164,"window_time":{"start":{"long":1550493600000},"end":{"long":1550493900000}}}
{"op":{"string":"open"},"count":436877,"window_time":{"start":{"long":1550493600000},"end":{"long":1550493900000}}}
3.4 统计1分钟内各个来源IP的open操作次数
- 注册Kafka Topic Schema:
{"type":"record",
"name":"TempResult",
"namespace":"org.apache.spark.sql.streaming.test",
"fields":[
{"name":"ip","type":["string","null"]},
{"name":"count","type":"long"},
{"name":"window_time","type":
{"type":"record",
"name":"window_time",
"namespace":"org.apache.spark.sql.streaming.test.window_time",
"fields":[
{"name":"start","type":[{"type":"long","logicalType":"timestamp-millis"},"null"]},
{"name":"end","type":[{"type":"long","logicalType":"timestamp-millis"},"null"]}
]
}
}
]
}
- 创建一个结果表,命名为:“kafka_hdfs_auditlog_read_count_per_ip_in_1_mins”。
DROP TABLE IF EXISTS kafka_hdfs_auditlog_read_count_per_ip_in_1_mins;
CREATE TABLE kafka_hdfs_auditlog_read_count_per_ip_in_1_mins
USING kafka
OPTIONS (
kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",
subscribe = "kafka_hdfs_auditlog_read_count_per_ip_in_1_mins",
output.mode = "append",
kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",
kafka.schema.record.name = "TempResult",
kafka.schema.record.namespace = "org.apache.spark.sql.streaming.test");
- 查询分析
SET streaming.query.name=hdfs_auditlog_read_count_per_ip_in_1_mins;
SET spark.sql.streaming.checkpointLocation.hdfs_auditlog_read_count_per_ip_in_1_mins=/tmp/spark/sql/streaming/hdfs_auditlog_read_count_per_ip_in_1_mins;
INSERT INTO kafka_hdfs_auditlog_read_count_per_ip_in_1_mins
SELECT ip, count(*) count, window window_time
FROM loghub_hdfs_auditlog
where cmd='open'
GROUP BY TUMBLING(__time__, interval 1 minute), ip
HAVING delay(__time__) < '1 minutes';
- 查看结果
窗口: 1550540940000 ~ 1550541000000
{"ip":{"string":"172.*.*.130"},"count":69090,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}
{"ip":{"string":"172.*.*.1"},"count":69266,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}
{"ip":{"string":"10.*.*.1"},"count":5129,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}
{"ip":{"string":"172.*.*.52"},"count":70469,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}
{"ip":{"string":"10.*.*.24"},"count":7206,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}
{"ip":{"string":"172.*.*.48"},"count":136101,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}
{"ip":{"string":"172.*.*.19"},"count":67226,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}
{"ip":{"string":"172.*.*.82"},"count":141886,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}
{"ip":{"string":"172.*.*.50"},"count":69165,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}
{"ip":{"string":"172.*.*.54"},"count":151539,"window_time":{"start":{"long":1550540940000},"end":{"long":1550541000000}}}
窗口: 1550541000000 ~ 1550541060000
{"ip":{"string":"192.*.*.22"},"count":77776,"window_time":{"start":{"long":1550541000000},"end":{"long":1550541060000}}}
{"ip":{"string":"10.*.*.111"},"count":9373,"window_time":{"start":{"long":1550541000000},"end":{"long":1550541060000}}}
{"ip":{"string":"10.*.*.2"},"count":9329,"window_time":{"start":{"long":1550541000000},"end":{"long":1550541060000}}}
窗口: 1550541060000 ~ 1550541120000
{"ip":{"string":"172.*.*.207"},"count":10481,"window_time":{"start":{"long":1550541060000},"end":{"long":1550541120000}}}
{"ip":{"string":"192.*.*.138"},"count":28965,"window_time":{"start":{"long":1550541060000},"end":{"long":1550541120000}}}
{"ip":{"string":"10.*.*.160"},"count":22015,"window_time":{"start":{"long":1550541060000},"end":{"long":1550541120000}}}
{"ip":{"string":"172.*.*.234"},"count":32892,"window_time":{"start":{"long":1550541060000},"end":{"long":1550541120000}}}
3.5 统计1分钟内QPS超过100的来源IP
- 注册Kafka Topic Schema:
{"type":"record",
"name":"TempResult",
"namespace":"org.apache.spark.sql.streaming.test",
"fields":[
{"name":"ip","type":["string","null"]},
{"name":"qps","type":["double","null"]},
{"name":"window_time","type":
{"type":"record",
"name":"window_time",
"namespace":"org.apache.spark.sql.streaming.test.window_time",
"fields":[
{"name":"start","type":[{"type":"long","logicalType":"timestamp-millis"},"null"]},
{"name":"end","type":[{"type":"long","logicalType":"timestamp-millis"},"null"]}
]
}
}
]
}
- 创建一个结果表,命名为:“kafka_hdfs_auditlog_qps_gt_100_ip_in_1_mins”。
DROP TABLE IF EXISTS kafka_hdfs_auditlog_qps_gt_100_ip_in_1_mins;
CREATE TABLE kafka_hdfs_auditlog_qps_gt_100_ip_in_1_mins
USING kafka
OPTIONS (
kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",
subscribe = "kafka_hdfs_auditlog_qps_gt_100_ip_in_1_mins",
output.mode = "append",
kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",
kafka.schema.record.name = "TempResult",
kafka.schema.record.namespace = "org.apache.spark.sql.streaming.test");
- 查询分析
SET streaming.query.name=hdfs_auditlog_qps_gt_100_ip_in_1_mins;
SET spark.sql.streaming.checkpointLocation.hdfs_auditlog_qps_gt_100_ip_in_1_mins=/tmp/spark/sql/streaming/hdfs_auditlog_qps_gt_100_ip_in_1_mins;
INSERT INTO kafka_hdfs_auditlog_qps_gt_100_ip_in_1_mins
SELECT ip, qps, window window_time
FROM
(SELECT ip, count(cmd)/60 qps, window
FROM loghub_hdfs_auditlog
GROUP BY TUMBLING(__time__, interval 1 minute), ip
HAVING delay(__time__) < '1 minutes') t
WHERE qps > 100;
- 查看结果
窗口:1550543160000 ~ 1550543220000
{"ip":{"string":"10.*.*.140"},"qps":{"double":328.6333333333333},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"172.*.*.13"},"qps":{"double":276.43333333333334},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"172.*.*.27"},"qps":{"double":228.23333333333332},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"192.*.*.170"},"qps":{"double":407.4},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"172.*.*.95"},"qps":{"double":233.6},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"172.*.*.8"},"qps":{"double":341.3833333333333},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"192.*.*.167"},"qps":{"double":357.73333333333335},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"10.*.*.142"},"qps":{"double":283.05},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"192.*.*.98"},"qps":{"double":371.46666666666664},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"192.*.*.209"},"qps":{"double":501.35},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"10.*.*.12"},"qps":{"double":276.8333333333333},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"10.*.*.98"},"qps":{"double":276.3833333333333},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"172.*.*.169"},"qps":{"double":231.86666666666667},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
{"ip":{"string":"10.*.*.166"},"qps":{"double":358.9},"window_time":{"start":{"long":1550543160000},"end":{"long":1550543220000}}}
窗口:1550543220000 ~ 1550543280000
{"ip":{"string":"172.*.*.41"},"qps":{"double":675.3},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
{"ip":{"string":"192.*.*.97"},"qps":{"double":364.45},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
{"ip":{"string":"172.*.*.17"},"qps":{"double":392.98333333333335},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
{"ip":{"string":"192.*.*.234"},"qps":{"double":361.3833333333333},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
{"ip":{"string":"192.*.*.72"},"qps":{"double":354.98333333333335},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
{"ip":{"string":"172.*.*.3"},"qps":{"double":513.2833333333333},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
{"ip":{"string":"10.*.*.1"},"qps":{"double":435.18333333333334},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
{"ip":{"string":"10.*.*.240"},"qps":{"double":458.45},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
{"ip":{"string":"10.*.*.85"},"qps":{"double":500.46666666666664},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
{"ip":{"string":"172.*.*.234"},"qps":{"double":635.1333333333333},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
{"ip":{"string":"172.*.*.107"},"qps":{"double":371.76666666666665},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
{"ip":{"string":"10.*.*.195"},"qps":{"double":357.46666666666664},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
{"ip":{"string":"10.*.*.31"},"qps":{"double":178.95},"window_time":{"start":{"long":1550543220000},"end":{"long":1550543280000}}}
4. 小结
本文简要示例了如何在EMR上使用Spark Streaming SQL进行流式分析。需要注意的是,EMR Spark Streaming SQL处于预览版阶段,语法和功能还在不断丰富完善中。