使用Spark SQL进行流式机器学习计算(上)

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 什么是流式机器学习, 机器学习模型获取途径, 系统演示

作者:余根茂,阿里巴巴计算平台事业部EMR团队的技术专家,参与了Hadoop,Spark,Kafka等开源项目的研发工作。目前主要专注于EMR流式计算产品的研发工作。

今天来和大家聊一下如何使用Spark SQL进行流式数据的机器学习处理。本文主要分为以下几个章节:

  • 什么是流式机器学习
  • 机器学习模型获取途径
  • 系统演示

1. 什么是流式机器学习

通常,当我们听到有人提到实时数据机器学习时,其实他们是讨论:

  • 他们希望有一个模型,这个模型利用最近历史信息来进行预测分析。举一个天气的例子,如果最近几天都是晴天,那么未来几天极小概率会出现雨雪和低温天气
  • 这个模型还需要是可更新的。当数据流经系统时,模型是可以随之进化升级。举个例子,随着业务规模的扩大,我们希望零售销售模型仍然保持准确。

第一个例子我们可以将它归为时序预测。第二个例子中,模型需要更新或者重新训练,这是一个non-stationarity问题。时序预测和non-stationarity数据分布是两类不同的问题。本文主要关注第二类问题,对于这类问题,一般的解决方案主要有:

  • 增量式算法:有一些算法支持通过数据逐步学习。也就是说,每次进来一些新的数据时,模型会被更新。SVM,神经网络等算法都有增量式版本,此外贝叶斯网络也可以用作增量学习。
  • 周期重新学习:一个更加直接的方法就是用一批最新数据重新训练我们的模型。这种方法可以用到的绝大多数的算法上。

2. 机器学习模型获取途径

实时机器学习应用分成两块,一部分是模型实时训练,另一部分是数据实时预测分析。现实中,我们可能没法实现模型的实时训练,只能退而求其次地使用已经训练好模型。这些模型可能会周期性地使用历史数据训练更新一次。所以,我们可以根据实际的算法和模型时效性要求,来选择实时训练模型还是使用预训练好的模型。

  • 模型算法支持增量训练:可以选择用流式数据实时训练更新
  • 模型算法不自持增量训练:可以选择用离线数据预先训练好模式

回到主题上,我们要实现使用Spark SQL进行流式机器学习。前面几篇文章已经简单介绍了EMR如何使用Spark SQL进行流式ETL处理。既然要进行机器学习,我们很自然地想到Spark MLlib。DataBricks有篇文档介绍了在Spark Structured Streaming进行机器学习,大家有兴趣的可以看下。如果想将Spark MLlib应用到Spark SQL上,我们可以简单地将MLlib算法包装成UDF使用。另外一个模型获取途径是利用阿里云上的一些在线机器学习服务,我们可以将在线机器学习服务使用UDF封装后使用。

  • 使用UDF封装现有的Spark MLlib算法
  • 使用UDF封装阿里云在线机器学习服务

限于篇幅,我会分两篇文章分别介绍这两个方式,本文将简单介绍如何利用Spark MLlib进行流式机器学习。

3. 系统演示

本节,我们将演示一下如何利用逻辑回归算法进行演示。

3.1 系统架构
下面这张图展示了整个实时监测系统的架构,前端接LogService数据,实时监测分析结果写入到RDS,最后通过DataV展示出来。
image

3.2 测试数据集
测试数据集使用Spark自带的sample_libsvm_data.txt,我们要做的是写一个数据生成器,将数据集的数据不断地向SLS中发送,模拟流式数据。

算法模型准备
Spark MLlib提供了大量的机器学习算法实现,可以方便的再RDD或者DataFrame API上使用,但是无法直接用在SQL API上,所以我们需要使用UDF来封装一下。这里,我们选用逻辑回归算法,具体的实现就不细说了,可以参考这里的代码:LogisticRegressionUDF.scala

3.4 部署测试

  • CLI
git clone git@github.com:aliyun/aliyun-emapreduce-sdk.git
cd aliyun-emapreduce-sdk
git checkout -b master-2.x origin/master-2.x
mvn clean package -DskipTests

## 编译完后, assembly/target目录下会生成emr-datasources_shaded_2.11-1.7.0-SNAPSHOT.jar

spark-sql --master yarn-client --num-executors 2 --executor-memory 2g --executor-cores 2 --jars emr-datasources_shaded_2.11-1.7.0-SNAPSHOT.jar --driver-class-path emr-datasources_shaded_2.11-1.7.0-SNAPSHOT.jar
  • 建表
spark-sql> USE default;

-- 测试数据源
spark-sql> CREATE TABLE IF NOT EXISTS sls_dataset
USING loghub
OPTIONS (
sls.project = "${logProjectName}",
sls.store = "${logStoreName}",
access.key.id = "${accessKeyId}",
access.key.secret = "${accessKeySecret}",
endpoint = "${endpoint}");

spark-sql> DESC sls_dataset
__logProject__  string  NULL
__logStore__  string  NULL
__shard__ int NULL
__time__  timestamp NULL
__topic__ string  NULL
__source__  string  NULL
label string  NULL
features  string  NULL
__tag__hostname__ string  NULL
__tag__path__ string  NULL
__tag__receive_time__ string  NULL
Time taken: 0.058 seconds, Fetched 11 row(s)

-- 结果数据源
spark-sql> CREATE TABLE IF NOT EXISTS rds_result
USING jdbc2
OPTIONS (
url="${rdsUrl}",
driver="com.mysql.jdbc.Driver",
dbtable="${rdsTableName}",
user="${user}",
password="${password}",
batchsize="100",
isolationLevel="NONE");

spark-sql> DESC rds_result;
acc double  NULL
label double  NULL
time  string  NULL
Time taken: 0.457 seconds, Fetched 3 row(s)
  • 注册UDF

CREATE FUNCTION Logistic_Regression AS 'org.apache.spark.sql.aliyun.udfs.ml.LogisticRegressionUDF' USING JAR '${udf_jar_path}';
  • 提交执行
SET spark.sql.streaming.checkpointLocation.lr_prediction=hdfs:///tmp/spark/lr_prediction;
SET spark.sql.streaming.query.outputMode.lr_prediction=update;
-- 由于DataSource是基于JDBC实现的,所以我们需要设置向RDS表插入数据的SQL
-- 这里我的RDS表名是`result`
SET streaming.query.lr_prediction.sql=insert into `result`(`time`, `label`, `acc`) values(?, ?, ?);

INSERT INTO 
rds_result 
SELECT 
window.start, 
label, 
sum(if(tb.predict = tb.label, 1, 0)) / count(tb.label) as acc 
FROM(
SELECT 
default.Logistic_Regression("${LR_model_path}", concat_ws(" ", label, features)) as predict, 
label, 
__time__ as time 
FROM sls_dataset) tb 
GROUP BY TUMBLING(tb.time, interval 10 second), tb.label;

3.5 效果展示
在DataV中配置上面的RDS结果表,使用折线图查看label=1的预测准确率,如下:
image

4. 小结

本文简要介绍了流式机器学习面临的几个问题,以及相应的解决方法。并使用Spark SQL结合Spark MLlib演示了一个流式机器学习的案例。下一篇,我会简要介绍Spark SQL如何结合阿里云的在线机器学习服务来进行流式机器学习应用开发。

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
1月前
|
SQL 存储 分布式计算
|
1月前
|
分布式计算 Apache 数据安全/隐私保护
流计算引擎数据问题之在 Spark Structured Streaming 中水印计算和使用如何解决
流计算引擎数据问题之在 Spark Structured Streaming 中水印计算和使用如何解决
37 1
|
18天前
|
Java 前端开发 容器
Struts 2:在技术变革的风暴中航行,探索框架的革命性未来与创新融合之路
【8月更文挑战第31天】Struts 2作为一款成熟的企业级Java Web框架,凭借其稳定性和灵活性自2007年以来广受欢迎。它基于MVC设计模式,支持插件架构和RESTful服务,并能与Spring框架良好集成。然而,面对微服务架构和容器化技术(如Docker和Kubernetes)的兴起,Struts 2需提供更轻量级和支持指南来适应变化。通过深化与现代前端框架(如React和Vue.js)及AI技术的集成,并强化安全性与开发工具,Struts 2有望保持竞争力并迎接未来挑战。
29 0
|
18天前
|
机器学习/深度学习 SQL 数据采集
"解锁机器学习数据预处理新姿势!SQL,你的数据金矿挖掘神器,从清洗到转换,再到特征工程,一网打尽,让数据纯净如金,模型性能飙升!"
【8月更文挑战第31天】在机器学习项目中,数据质量至关重要,而SQL作为数据预处理的强大工具,助力数据科学家高效清洗、转换和分析数据。通过去除重复记录、处理缺失值和异常值,SQL确保数据纯净;利用数据类型转换和字符串操作,SQL重塑数据结构;通过复杂查询生成新特征,SQL提升模型性能。掌握SQL,就如同拥有了开启数据金矿的钥匙,为机器学习项目奠定坚实基础。
24 0
|
27天前
|
SQL 机器学习/深度学习 开发工具
【机器学习 Azure Machine Learning】Azure Machine Learning 访问SQL Server 无法写入问题 (使用微软Python AML Core SDK)
【机器学习 Azure Machine Learning】Azure Machine Learning 访问SQL Server 无法写入问题 (使用微软Python AML Core SDK)
|
2月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之怎么编写和执行Spark SQL
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
3月前
|
机器学习/深度学习 分布式计算 API
技术好文:Spark机器学习笔记一
技术好文:Spark机器学习笔记一
28 0
|
4月前
|
SQL 分布式计算 关系型数据库
Spark 分析计算连续三周登录的用户数
本文介绍了如何使用窗口函数`range between`来查询`login_time`为2022-03-10的用户最近连续三周的登录数。首先在MySQL中创建`log_data`表并插入数据,接着定义需求为找出该日期前连续三周活跃的用户数。通过Spark SQL,分步骤实现:1)确定统计周期,2)筛选符合条件的数据,3)计算用户连续登录状态。在初始实现中出现错误,因未考虑日期在周中的位置,修正后正确计算出活跃用户数。
|
4月前
|
机器学习/深度学习 数据采集 分布式计算
【机器学习】Spark ML 对数据进行规范化预处理 StandardScaler 与向量拆分
标准化Scaler是数据预处理技术,用于将特征值映射到均值0、方差1的标准正态分布,以消除不同尺度特征的影响,提升模型稳定性和精度。Spark ML中的StandardScaler实现此功能,通过`.setInputCol`、`.setOutputCol`等方法配置并应用到DataFrame数据。示例展示了如何在Spark中使用StandardScaler进行数据规范化,包括创建SparkSession,构建DataFrame,使用VectorAssembler和StandardScaler,以及将向量拆分为列。规范化有助于降低特征重要性,提高模型训练速度和计算效率。
|
4月前
|
机器学习/深度学习 分布式计算 算法
【机器学习】Spark ML 对数据特征进行 One-Hot 编码
One-Hot 编码是机器学习中将离散特征转换为数值表示的方法,每个取值映射为一个二进制向量,常用于避免特征间大小关系影响模型。Spark ML 提供 OneHotEncoder 进行编码,输入输出列可通过 `inputCol` 和 `outputCol` 参数设置。在示例中,先用 StringIndexer 对类别特征编码,再用 OneHotEncoder 转换,最后展示编码结果。注意 One-Hot 编码可能导致高维问题,可结合实际情况选择编码方式。