通过Spark SQL实时归档SLS数据

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 我在前一篇文章介绍过基于Spark SQL实现对HDFS操作的实时监控报警。今天,我再举例说明一下如何使用Spark SQL进行流式应用的开发。

我在前一篇文章介绍过基于Spark SQL实现对HDFS操作的实时监控报警。今天,我再举例说明一下如何使用Spark SQL进行流式应用的开发。本文主要分成三部分:

  • 流式计算和SQL
  • 简要介绍Spark SQL流式开发语法
  • 实时归档SLS数据到HDFS

1. 流式计算和SQL

数据的价值随着时间逐渐降低。及时尽早的对数据进行处理提升了数据的价值,所以流式计算系统的应用也越来越广泛。目前常用的流式计算框架有Storm,Spark Streaming及Flink等,也有Kafka Streams这类基于Kafka的流式处理类库。各种流式处理框架都有其各自的API,开发者不可避免的需要学习如何使用这些API。如何提供简单而有效的开发工具,从而把更多的精力投放在业务处理中。所以,各个流式处理系统都逐渐支持SQL API作为开发语言,让使用者可以像处理Table一样处理Stream。例如KSQL支持使用SQL进行流式处理Kafka数据。Spark同样提出来Structured Streaming作为最新一代的流式处理系统,底层的处理引擎也是Spark SQL。不过在上层SQL API,缺少Structured Streaming必要的功能,例如window,watermark等。EMR在Spark开源版本上进行了功能扩展,支持使用SQL API在Spark上进行完整的流式查询开发。

2. Spark SQL流式开发入门

这节将简要介绍Spark SQL中关于流式开的概念和语法。

2.1 建表

当我们需要对流式数据源进行读写操作时,需要首先创建一张表来表示这个数据源。定义表的语法如下:

CREATE TABLE tbName[(columnName dataType [,columnName dataType]*)]
USING providerName
OPTIONS(propertyName=propertyValue[,propertyName=propertyValue]*);

以上语法中,针对特殊source,不要求一定指定表的列定义。当不指定列定义时,会自动识别数据源的schema信息。举一个例子:

CREATE TABLE driver_behavior 
USING kafka 
OPTIONS (
kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",
subscribe = "${TOPIC_NAME}",
output.mode = "${OUTPUT_MODE}",
kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",
kafka.schema.record.name = "${SCHEMA_RECORD_NAME}",
kafka.schema.record.namespace = "${SCHEMA_RECORD_NAMESPACE}");

当数据源是Kafka时,会根据Kafka Topic名去到Kafka Schema Registry中查找schema信息。当然,我们也可以指定列定义,例如:

CREATE TABLE driverbehavior(deviceId string, velocity double)
USING kafka 
OPTIONS (
kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",
subscribe = "${TOPIC_NAME}",
output.mode = "${OUTPUT_MODE}",
kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",
kafka.schema.record.name = "${SCHEMA_RECORD_NAME}",
kafka.schema.record.namespace = "${SCHEMA_RECORD_NAMESPACE}");

当指定列定义时,要求必须和Source中的字段定义是一致的。当执行完CREATE TABLE操作,表的定义会保存到Hive MetaStore中。

2.2 CTAS

我们可以将创建表和将查询结果写入到表的语句合并到一起,那么就是CREATE TABLE ... AS SELECT ...语法:

CREATE TABLE tbName[(columnName dataType [,columnName dataType]*)]
USING providerName
OPTIONS(propertyName=propertyValue[,propertyName=propertyValue]*)
AS
queryStatement;

举一个例子(引用自这里: q103):

CREATE TABLE kafka_temp_table
USING kafka
OPTIONS (
kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",
subscribe = "${TOPIC_NAME}",
output.mode = "${OUTPUT_MODE}",
kafka.schema.registry.url = "${SCHEMA_REGISTRY_URL}",
kafka.schema.record.name = "${SCHEMA_RECORD_NAME}",
kafka.schema.record.namespace = "${SCHEMA_RECORD_NAMESPACE}") AS
SELECT
  i_brand_id brand_id,
  i_brand brand,
  sum(ss_ext_sales_price) ext_price
FROM date_dim, kafka_store_sales, item
WHERE d_date_sk = ss_sold_date_sk
  AND ss_item_sk = i_item_sk
  AND i_manager_id = 28
  AND d_moy = 11
  AND d_year = 1999
  AND delay(ss_data_time) < '2 minutes'
GROUP BY TUMBLING(ss_data_time, interval 1 minute), i_brand, i_brand_id

当执行完操作,将创建出表并实际生成一个StreamQuery实例,将查询结果写入到结果表中。

2.3 DML

流式查询SQL和离线SQL标准语法大部分是一样,这边主要介绍insert操作。流式查询是不允许单独进行SELECT操作,必须将SELECT的查询结果写入到表中。所以,需要在SELECT操作之前执行INSERT操作。

INSERT INTO tbName[(columnName[,columnName]*)]
queryStatement;

以上语法为一次流式查询:这个语句将实际生成一个StreamQuery实例,将查询结果写入到结果表中。举一个例子:

INSERT INTO kafka_temp_table
SELECT
  i_brand_id brand_id,
  i_brand brand,
  sum(ss_ext_sales_price) ext_price
FROM date_dim, kafka_store_sales, item
WHERE d_date_sk = ss_sold_date_sk
  AND ss_item_sk = i_item_sk
  AND i_manager_id = 28
  AND d_moy = 11
  AND d_year = 1999
  AND delay(ss_data_time) < '2 minutes'
GROUP BY TUMBLING(ss_data_time, interval 1 minute), i_brand, i_brand_id

2.4 window及watermark

限于篇幅,本文暂且不介绍Spark SQL中如何使用window和watermak,有兴趣的可以先看看资料,后续会专门撰文介绍。

2.5 流式作业配置

使用SQL进行流式作业开发时,有些必要的配置无法在Query表达出来,需要单独进行设置。这里我们使用SET操作进行流式作业必要参数配置,当前有两个参数需要设置:

name config
流式查询实例名称 streaming.query.name
流式作业Checkpoint地址 spark.sql.streaming.checkpointLocation.${streaming.query.name}

每一个流式查询实例前都需要进行配置,也就是说,当使用CTAS或者Insert操作时,必须前置这两个配置。一个SQL文件支持多个流式查询,例如:

-- test.sql

SET streaming.query.name=query1;
SET spark.sql.streaming.checkpointLocation.query1=/tmp/spark/query1
INSERT INTO tbName1 [(columnName[,columnName]*)]
queryStatement1;

SET streaming.query.name=query2;
SET spark.sql.streaming.checkpointLocation.query2=/tmp/spark/query2
INSERT INTO tbName2 [(columnName[,columnName]*)]
queryStatement2;

3. SLS数据实时归档实战

假定一个场景,现在通过SLS收集了业务服务器上的日志,需要归档到HDFS中,便于后续进行离线分析。这里涉及到两个数据源:SLS和HDFS。HDFS是Spark官方支持的数据源,支持流和批的读写。SLS是阿里云的服务,EMR已经支持了流式读写。

  • 环境准备
    需要E-MapReduce 3.21.0以上版本集群环境,当前正在发布准备中,很快和大家见面,敬请期待。
  • 命令行
spark-sql --master yarn-client --conf spark.sql.streaming.datasource.provider=loghub --jars emr-logservice_shaded_2.11-1.7.0-SNAPSHOT.jar

注:emr-logservice_shaded_2.11-1.7.0-SNAPSHOT.jar将会在EMR SDK 1.7.0版本发布出来。

  • 分别创建两张表:sls_service_log和hdfs_service_log
CREATE DATABASE IF NOT EXISTS default;
USE default;

DROP TABLE IF EXISTS hdfs_service_log;
CREATE TABLE hdfs_service_log (instance_name string, ip string, content string)
USING PARQUET
LOCATION '/tmp/hdfs_service_log';

DROP TABLE IF EXISTS sls_service_log;
CREATE TABLE sls_service_log
USING loghub
OPTIONS (
sls.project = "${logProjectName}",
sls.store = "${logStoreName}",
access.key.id = "${accessKeyId}",
access.key.secret = "${accessKeySecret}",
endpoint = "${endpoint}");
  • 通过Spark SQL启动一个Stream Query将SLS数据实时同步到HDFS中
set streaming.query.name=sync_sls_to_hdfs;
set spark.sql.streaming.checkpointLocation.sync_sls_to_hdfs=hdfs:///tmp/spark/sync_sls_to_hdfs;

INSERT INTO hdfs_service_log
select
__tag__hostname__ as instance_name,
ip,
content
from sls_service_log;
  • 查看HDFS数据归档情况

image

  • 使用Spark SQL对归档的数据进行离线分析:例如统计一共有多少个IP
select distinct(ip) from hdfs_service_log;

image

4. 结语

以上,我们介绍了Spark SQL在流式处理中的一个非常简单的例子。其实,我们还可以使用Spark SQL进行更加复杂的流式处理任务。后续文章,我将介绍窗口操作,watermark等概念,以及如何在流式数据上进行简单的机器学习运算。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
1月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
49 3
|
8天前
|
存储 Oracle 关系型数据库
【赵渝强老师】MySQL InnoDB的数据文件与重做日志文件
本文介绍了MySQL InnoDB存储引擎中的数据文件和重做日志文件。数据文件包括`.ibd`和`ibdata`文件,用于存放InnoDB数据和索引。重做日志文件(redo log)确保数据的可靠性和事务的持久性,其大小和路径可由相关参数配置。文章还提供了视频讲解和示例代码。
114 11
【赵渝强老师】MySQL InnoDB的数据文件与重做日志文件
|
15天前
|
SQL 存储 缓存
SQL Server 数据太多如何优化
11种优化方案供你参考,优化 SQL Server 数据库性能得从多个方面着手,包括硬件配置、数据库结构、查询优化、索引管理、分区分表、并行处理等。通过合理的索引、查询优化、数据分区等技术,可以在数据量增大时保持较好的性能。同时,定期进行数据库维护和清理,保证数据库高效运行。
|
8天前
|
SQL Oracle 关系型数据库
【赵渝强老师】Oracle的联机重做日志文件与数据写入过程
在Oracle数据库中,联机重做日志文件记录了数据库的变化,用于实例恢复。每个数据库有多组联机重做日志,每组建议至少有两个成员。通过SQL语句可查看日志文件信息。视频讲解和示意图进一步解释了这一过程。
|
30天前
|
SQL 移动开发 Oracle
SQL语句实现查询连续六天数据的方法与技巧
在数据库查询中,有时需要筛选出符合特定时间连续性条件的数据记录
|
1月前
|
SQL 数据库
为什么 SQL 日志文件很大,我应该如何处理?
为什么 SQL 日志文件很大,我应该如何处理?
|
1月前
|
SQL 存储 关系型数据库
添加数据到数据库的SQL语句详解与实践技巧
在数据库管理中,添加数据是一个基本操作,它涉及到向表中插入新的记录
|
1月前
|
SQL 数据挖掘 数据库
SQL查询每秒的数据:技巧、方法与性能优化
id="">SQL查询功能详解 SQL(Structured Query Language,结构化查询语言)是一种专门用于与数据库进行沟通和操作的语言
|
1月前
|
SQL 监控 数据处理
SQL数据库数据修改操作详解
数据库是现代信息系统的重要组成部分,其中SQL(StructuredQueryLanguage)是管理和处理数据库的重要工具之一。在日常的业务运营过程中,数据的准确性和及时性对企业来说至关重要,这就需要掌握如何在数据库中正确地进行数据修改操作。本文将详细介绍在SQL数据库中如何修改数据,帮助读者更好
199 4
|
1月前
|
SQL 数据库
为什么SQL日志文件很大,该如何处理?
为什么SQL日志文件很大,该如何处理?