通过Spark SQL实时归档SLS数据

简介: 我在前一篇文章介绍过基于Spark SQL实现对HDFS操作的实时监控报警。今天,我再举例说明一下如何使用Spark SQL进行流式应用的开发。

我在前一篇文章介绍过基于Spark SQL实现对HDFS操作的实时监控报警。今天,我再举例说明一下如何使用Spark SQL进行流式应用的开发。本文主要分成三部分:

  • 流式计算和SQL
  • 简要介绍Spark SQL流式开发语法
  • 实时归档SLS数据到HDFS

1. 流式计算和SQL

数据的价值随着时间逐渐降低。及时尽早的对数据进行处理提升了数据的价值,所以流式计算系统的应用也越来越广泛。目前常用的流式计算框架有Storm,Spark Streaming及Flink等,也有Kafka Streams这类基于Kafka的流式处理类库。各种流式处理框架都有其各自的API,开发者不可避免的需要学习如何使用这些API。如何提供简单而有效的开发工具,从而把更多的精力投放在业务处理中。所以,各个流式处理系统都逐渐支持SQL API作为开发语言,让使用者可以像处理Table一样处理Stream。例如KSQL支持使用SQL进行流式处理Kafka数据。Spark同样提出来Structured Streaming作为最新一代的流式处理系统,底层的处理引擎也是Spark SQL。不过在上层SQL API,缺少Structured Streaming必要的功能,例如window,watermark等。EMR在Spark开源版本上进行了功能扩展,支持使用SQL API在Spark上进行完整的流式查询开发。

2. Spark SQL流式开发入门

这节将简要介绍Spark SQL中关于流式开的概念和语法。

2.1 建表

当我们需要对流式数据源进行读写操作时,需要首先创建一张表来表示这个数据源。定义表的语法如下:

CREATE TABLE tbName[(columnName dataType [,columnName dataType]*)]
USING providerName
OPTIONS(propertyName=propertyValue[,propertyName=propertyValue]*);

以上语法中,针对特殊source,不要求一定指定表的列定义。当不指定列定义时,会自动识别数据源的schema信息。举一个例子:

CREATE TABLE driver_behavior 
USING kafka 
OPTIONS (
kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",
subscribe = "${TOPIC_NAME}",
output.mode = "${OUTPUT_MODE}",
kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",
kafka.schema.record.name = "${SCHEMA_RECORD_NAME}",
kafka.schema.record.namespace = "${SCHEMA_RECORD_NAMESPACE}");

当数据源是Kafka时,会根据Kafka Topic名去到Kafka Schema Registry中查找schema信息。当然,我们也可以指定列定义,例如:

CREATE TABLE driverbehavior(deviceId string, velocity double)
USING kafka 
OPTIONS (
kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",
subscribe = "${TOPIC_NAME}",
output.mode = "${OUTPUT_MODE}",
kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",
kafka.schema.record.name = "${SCHEMA_RECORD_NAME}",
kafka.schema.record.namespace = "${SCHEMA_RECORD_NAMESPACE}");

当指定列定义时,要求必须和Source中的字段定义是一致的。当执行完CREATE TABLE操作,表的定义会保存到Hive MetaStore中。

2.2 CTAS

我们可以将创建表和将查询结果写入到表的语句合并到一起,那么就是CREATE TABLE ... AS SELECT ...语法:

CREATE TABLE tbName[(columnName dataType [,columnName dataType]*)]
USING providerName
OPTIONS(propertyName=propertyValue[,propertyName=propertyValue]*)
AS
queryStatement;

举一个例子(引用自这里: q103):

CREATE TABLE kafka_temp_table
USING kafka
OPTIONS (
kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",
subscribe = "${TOPIC_NAME}",
output.mode = "${OUTPUT_MODE}",
kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",
kafka.schema.record.name = "${SCHEMA_RECORD_NAME}",
kafka.schema.record.namespace = "${SCHEMA_RECORD_NAMESPACE}") AS
SELECT
  i_brand_id brand_id,
  i_brand brand,
  sum(ss_ext_sales_price) ext_price
FROM date_dim, kafka_store_sales, item
WHERE d_date_sk = ss_sold_date_sk
  AND ss_item_sk = i_item_sk
  AND i_manager_id = 28
  AND d_moy = 11
  AND d_year = 1999
  AND delay(ss_data_time) < '2 minutes'
GROUP BY TUMBLING(ss_data_time, interval 1 minute), i_brand, i_brand_id

当执行完操作,将创建出表并实际生成一个StreamQuery实例,将查询结果写入到结果表中。

2.3 DML

流式查询SQL和离线SQL标准语法大部分是一样,这边主要介绍insert操作。流式查询是不允许单独进行SELECT操作,必须将SELECT的查询结果写入到表中。所以,需要在SELECT操作之前执行INSERT操作。

INSERT INTO tbName[(columnName[,columnName]*)]
queryStatement;

以上语法为一次流式查询:这个语句将实际生成一个StreamQuery实例,将查询结果写入到结果表中。举一个例子:

INSERT INTO kafka_temp_table
SELECT
  i_brand_id brand_id,
  i_brand brand,
  sum(ss_ext_sales_price) ext_price
FROM date_dim, kafka_store_sales, item
WHERE d_date_sk = ss_sold_date_sk
  AND ss_item_sk = i_item_sk
  AND i_manager_id = 28
  AND d_moy = 11
  AND d_year = 1999
  AND delay(ss_data_time) < '2 minutes'
GROUP BY TUMBLING(ss_data_time, interval 1 minute), i_brand, i_brand_id

2.4 window及watermark

限于篇幅,本文暂且不介绍Spark SQL中如何使用window和watermak,有兴趣的可以先看看资料,后续会专门撰文介绍。

2.5 流式作业配置

使用SQL进行流式作业开发时,有些必要的配置无法在Query表达出来,需要单独进行设置。这里我们使用SET操作进行流式作业必要参数配置,当前有两个参数需要设置:

name config
流式查询实例名称 streaming.query.name
流式作业Checkpoint地址 spark.sql.streaming.checkpointLocation.${streaming.query.name}

每一个流式查询实例前都需要进行配置,也就是说,当使用CTAS或者Insert操作时,必须前置这两个配置。一个SQL文件支持多个流式查询,例如:

-- test.sql

SET streaming.query.name=query1;
SET spark.sql.streaming.checkpointLocation.query1=/tmp/spark/query1
INSERT INTO tbName1 [(columnName[,columnName]*)]
queryStatement1;

SET streaming.query.name=query2;
SET spark.sql.streaming.checkpointLocation.query2=/tmp/spark/query2
INSERT INTO tbName2 [(columnName[,columnName]*)]
queryStatement2;

3. SLS数据实时归档实战

假定一个场景,现在通过SLS收集了业务服务器上的日志,需要归档到HDFS中,便于后续进行离线分析。这里涉及到两个数据源:SLS和HDFS。HDFS是Spark官方支持的数据源,支持流和批的读写。SLS是阿里云的服务,EMR已经支持了流式读写。

  • 环境准备
    需要E-MapReduce 3.21.0以上版本集群环境,当前正在发布准备中,很快和大家见面,敬请期待。
  • 命令行
spark-sql --master yarn-client --conf spark.sql.streaming.datasource.provider=loghub --jars emr-logservice_shaded_2.11-1.7.0-SNAPSHOT.jar

注:emr-logservice_shaded_2.11-1.7.0-SNAPSHOT.jar将会在EMR SDK 1.7.0版本发布出来。

  • 分别创建两张表:sls_service_log和hdfs_service_log
CREATE DATABASE IF NOT EXISTS default;
USE default;

DROP TABLE IF EXISTS hdfs_service_log;
CREATE TABLE hdfs_service_log (instance_name string, ip string, content string)
USING PARQUET
LOCATION '/tmp/hdfs_service_log';

DROP TABLE IF EXISTS sls_service_log;
CREATE TABLE sls_service_log
USING loghub
OPTIONS (
sls.project = "${logProjectName}",
sls.store = "${logStoreName}",
access.key.id = "${accessKeyId}",
access.key.secret = "${accessKeySecret}",
endpoint = "${endpoint}");
  • 通过Spark SQL启动一个Stream Query将SLS数据实时同步到HDFS中
set streaming.query.name=sync_sls_to_hdfs;
set spark.sql.streaming.checkpointLocation.sync_sls_to_hdfs=hdfs:///tmp/spark/sync_sls_to_hdfs;

INSERT INTO hdfs_service_log
select
__tag__hostname__ as instance_name,
ip,
content
from sls_service_log;
  • 查看HDFS数据归档情况

image

  • 使用Spark SQL对归档的数据进行离线分析:例如统计一共有多少个IP
select distinct(ip) from hdfs_service_log;

image

4. 结语

以上,我们介绍了Spark SQL在流式处理中的一个非常简单的例子。其实,我们还可以使用Spark SQL进行更加复杂的流式处理任务。后续文章,我将介绍窗口操作,watermark等概念,以及如何在流式数据上进行简单的机器学习运算。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
4天前
|
SQL 存储 Oracle
Oracle的PL/SQL定义变量和常量:数据的稳定与灵动
【4月更文挑战第19天】在Oracle PL/SQL中,变量和常量扮演着数据存储的关键角色。变量是可变的“魔术盒”,用于存储程序运行时的动态数据,通过`DECLARE`定义,可在循环和条件判断中体现其灵活性。常量则是不可变的“固定牌”,一旦设定值便保持不变,用`CONSTANT`声明,提供程序稳定性和易维护性。通过 `%TYPE`、`NOT NULL`等特性,可以更高效地管理和控制变量与常量,提升代码质量。善用两者,能优化PL/SQL程序的结构和性能。
|
23天前
|
SQL 存储 关系型数据库
一文搞懂SQL优化——如何高效添加数据
**SQL优化关键点:** 1. **批量插入**提高效率,一次性建议不超过500条。 2. **手动事务**减少开销,多条插入语句用一个事务。 3. **主键顺序插入**避免页分裂,提升性能。 4. **使用`LOAD DATA INFILE`**大批量导入快速。 5. **避免主键乱序**,减少不必要的磁盘操作。 6. **选择合适主键类型**,避免UUID或长主键导致的性能问题。 7. **避免主键修改**,保持索引稳定。 这些技巧能优化数据库操作,提升系统性能。
219 4
一文搞懂SQL优化——如何高效添加数据
|
1月前
|
SQL 数据可视化 数据处理
使用SQL和Python处理Excel文件数据
使用SQL和Python处理Excel文件数据
54 0
|
4天前
|
SQL Oracle 关系型数据库
Oracle的PL/SQL游标属性:数据的“导航仪”与“仪表盘”
【4月更文挑战第19天】Oracle PL/SQL游标属性如同车辆的导航仪和仪表盘,提供丰富信息和控制。 `%FOUND`和`%NOTFOUND`指示数据读取状态,`%ROWCOUNT`记录处理行数,`%ISOPEN`显示游标状态。还有`%BULK_ROWCOUNT`和`%BULK_EXCEPTIONS`增强处理灵活性。通过实例展示了如何在数据处理中利用这些属性监控和控制流程,提高效率和准确性。掌握游标属性是提升数据处理能力的关键。
|
4天前
|
SQL Oracle 安全
Oracle的PL/SQL循环语句:数据的“旋转木马”与“无限之旅”
【4月更文挑战第19天】Oracle PL/SQL中的循环语句(LOOP、EXIT WHEN、FOR、WHILE)是处理数据的关键工具,用于批量操作、报表生成和复杂业务逻辑。LOOP提供无限循环,可通过EXIT WHEN设定退出条件;FOR循环适用于固定次数迭代,WHILE循环基于条件判断执行。有效使用循环能提高效率,但需注意避免无限循环和优化大数据处理性能。掌握循环语句,将使数据处理更加高效和便捷。
|
4天前
|
SQL Oracle 关系型数据库
Oracle的PL/SQL条件控制:数据的“红绿灯”与“分岔路”
【4月更文挑战第19天】在Oracle PL/SQL中,IF语句与CASE语句扮演着数据流程控制的关键角色。IF语句如红绿灯,依据条件决定程序执行路径;ELSE和ELSIF提供多分支逻辑。CASE语句则是分岔路,按表达式值选择执行路径。这些条件控制语句在数据验证、错误处理和业务逻辑中不可或缺,通过巧妙运用能实现高效程序逻辑,保障数据正确流转,支持企业业务发展。理解并熟练掌握这些语句的使用是成为合格数据管理员的重要一环。
|
4天前
|
SQL Oracle 关系型数据库
Oracle的PL/SQL表达式:数据的魔法公式
【4月更文挑战第19天】探索Oracle PL/SQL表达式,体验数据的魔法公式。表达式结合常量、变量、运算符和函数,用于数据运算与转换。算术运算符处理数值计算,比较运算符执行数据比较,内置函数如TO_CHAR、ROUND和SUBSTR提供多样化操作。条件表达式如CASE和NULLIF实现灵活逻辑判断。广泛应用于SQL查询和PL/SQL程序,助你驾驭数据,揭示其背后的规律与秘密,成为数据魔法师。
|
27天前
|
SQL 关系型数据库 MySQL
SQL INSERT INTO order_record SELECT * FROM 从一张表查出数据插入到另一张表
SQL INSERT INTO order_record SELECT * FROM 从一张表查出数据插入到另一张表
14 0
|
29天前
|
SQL JSON Kubernetes
Seata常见问题之服务端 error日志没有输出,客户端执行sql报错如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
102 0
|
1月前
|
SQL 安全 数据库
第三章用sql语句操作数据
第三章用sql语句操作数据
10 0