数据预处理-数据解析-读取规则及加载到广播变量|学习笔记

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
全局流量管理 GTM,标准版 1个月
简介: 快速学习数据预处理-数据解析-读取规则及加载到广播变量

开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop 框架搭建)第三阶段数据预处理-数据解析-读取规则及加载到广播变量】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址https://developer.aliyun.com/learning/course/671/detail/11661


数据预处理-数据解析-读取规则及加载到广播变量


前面已经把数据解析的思路和目的确定过了,本节课学习到底该怎么实现这个流程。

首先,在 kafka 中把数据读出来,再从数据库当中读取解析的规则,尤其是查询性解析规则。因为我们现在要做的是查询业务数据的解析,所以要把查询的规则读取过来。

数据在 mysql 数据库当中,这涉及到了数据的读取流程。

image.png

先要在数据库当中把规则读到程序里,然后数据有了规则就可以去解析。把规则读取过来并把它放到广播变量里面,涉及到需要从 Redis 当中添加一个是否更新的标识,如果需要更新就走更新的流程,如果不需要更新就直接跳过。

然后用数据库的规则去进行过滤。过滤出对应的数据解析规则。

也就是从数据库当中把规则读过来,然后放到广播变量中,拿出所有查询的解析规则。在数据库当中。查询型用 Behaviortype 0 表示:

package com.air.antispider.stream.dataprocess.constants

//操作标记类别0-查询,1-预定,-1-其他

object BehaviorTypeEnum extends Enumeration{

type BehaviorTypeEnum = Value

val Query = Value(0,"Query")

val Book = value(1,"Book")

val Other = value(-1, "Other")}

那么 behavior_type 代表等于零的有三种情况.

image.png

那么这三种到底该使用哪一种去解析出发地,目的地和起飞时间这些数据?

也就是要过滤出对应的这条数据的解析规则,过滤完了以后就能够确定该用哪个解析,这样就从三个选择变成了一个选择。规则确定了,后面就开始进行解析。

第五步,数据库规则都在 analyzerule 这个表里面。

接下来看一下具体的实现思路。

先看查询性数据解析具体该怎么实现,按照什么顺序来:

查询类数据解析工作的目的就是解析出数据当中的出发地和目的地和起飞时间。具体的做法:

首先解析出 requestType 标记的一个业务场景,也就是国内查询、国际查询,国内预定、国际预定。这些在之前的课程已经解析完了,还要 travelType 也一样。这两个解析完以后,读取数据库内的解析规则到程序里边,这个是第一步。其实就是又读取数据库。第二步就是把这个规则加载到广播变量,第三步是对数据进行解析,一共就这三步。

第一步,读取数据库内的规则到我的程序当中。

image.png

将表内所有的查询规则全部都读取到程序里面。这里虽然查询的就这三条数据,但是里面有很多字段都能用得上。但是 book 的用不了。为了方便,要把这些数据一次性拿过来,包括后面的 depCity,arrCity 这些数据,以及前面的这些指标。解析的时候就要用这些指标从三个里面确定出到底用哪一个?

第一步读取数据库,那读取数据库的数据。

读取数据库的规则:

//读取数据分类规则(四种规则,每种单独读取)到预处理程厅

var RuleMaps=AnalyzeRuleDB.queryRuLeNap()

@volatile "var

broadcastRuleMaps=sc.broadcast(RuleMaps)

读取数据库要先把它放在程序初始化阶段:

//数据预处理的程序

def setupSsc(sc: SparkContext,kafkaPar

//程序初始化阶段

读过来以后,第二步需要将规则加载到广播分量里循环判断是否需要更新。

是否需要更新里也有很多步骤:更新流程在每一次循环里面,

val ssc=new StreamingContext(sc,Seconds(2))

这行代码表示每两秒钟迭代一次。每一次迭代都要去判断是否需要更新,具体流程就类似下面的代码,所以在这里面走第二步。

kafkaValue.foreachRDD( rdd=>{//迭代运行(每2秒运行一次)

//到 redis 读取是否需要更新的标记

val

NeedupDateFilterRule=redis.get("NeedupDateFilterRule")

//判断是否需要更新  若数据不为空并且数据转成 BooLean 为 true 表示需要更新if(!NeedupDateFilterRule.isEmpty && NeedupDateFilterRule.toBoolean){

//若需要更新,那么在数据库中重新读取新的过滤规则到程序中

filterRuleList=AnalyzeRuleDB.queryFilterRule()

//将广播变量清空

broadcastFilterRuleList.unpersist(

//将新的规则重新加载到广播变量

broadcastFilterRuleList= sc.broadcast(filterRuleList)

//将redis内是否需要更新规则的标识改为”false”

redis.set(“NeedupDateFilterRule"",false"))}

第三步,做数据解析。解析工作实际分为这三步:第一步,把它放在数据初始化阶段来执行。代码如下:

//读取数据库内的数据解析规则到预处理程序(将表内的所有查询规则数据全部读取到程序内)

//数据解析规则--查询类

var queryRule=AnalyzeRulgDB.queryRule(0)

@volatile var

broadcastQueryRules=sc.broadcast(queryRule)

//数据解析规则--预定类

var bookRule=AnalyzeRuleDB.queryRule(1)

@volatile var broadcastBookRules=sc.broadcast(bookRule)

查询类为0,预定类为1,AnalyzeRulgDB 方法还没有,说明还没有创建这个方法,创建。

代码如下:

image.png

这段代码就是来实现数据解析工作的。

能够看出里面很多爆红的,这里需要把它引进来。具体怎么做?

看下图的操作,AnalyzeRule 引入

com.air.antispider.streamcommon.bean.AnalyzeRule? Alt+Enter

image.png

同理:connect 引入 java.sql.Connection;Preparedstatement 引

入  java.sql.Preparedstatement;

ResultSet 引入 java.sql.ResultSet;c3p0UTil 引入 com.air.antispider.streamcommon.database.c3p0UTil

这样就不报错了,实现数据解析规则的读取的这个方法需要一个参数。

在 mysql 中解析出去的规则 Behaviortype  ,0是查询,1是预定。

看下面这行代码:

val sql: String = "select * from analyzerule where behavior_type =" + behaviorType

数据库当中有 behaviortype 这个表。现在要做查询型的业务需求。那么就把查询的规则查出来,也就是 behavior_type 等于零的查过来,有下面三种情况。

image.png

先看 behavior_type,sql 语句是:

val sql: String = "select * from analyzerule where

behavior_type =" + behaviorType

它等于一个值,这个值就是 int 类型的参数。

//读取数据库内的数据解析规则到预处理程序(将表内的所有查询规则数据全部读取到程序内)

//数据解析规则询类

var queryRule=AnalyzeRuleDB.queryRule( behaviorType = 0 )

@volatile  var broadcastQueryRules=sc.broadcast(queryRule)

//数揶解析规则--预定类

var bookRule=AnalyzeRuleDB.queryRuLe( behaviorType = 1)

@volatile var broadcastBookRules=sc.broadcast(bookRule)

这里面为什么要传个零进去,因为在这里要做的是查询型的数据,而查询型 behavior 在这里是零,所以就传一个零进去。所以把查询型的数据 behaviortype 等于零就查出来了。

前面不是用 queryDB 里面的 queryData 获取数据?

/ /国内查询规则读取

val nqRuleList=QueryDB.queryData(nqsQL,field)

/ /国际查询规则读取

val iqRuleList= QueryDB.queryData(iqsQL,field)

//国内预定规则读取

val nbRuleList= QueryDB.queryData(nbsQL,field)

//国际预定规则读取

val ibRuleList= QueryDe.queryData( ibsQL,field)

这样给个sql,给个字段就可以获取数据了,为什么要写一个 sql ,然后实现Connection,preparedStatement,ResultSet 为什么要通过这种方式?

因为这个属性,它这里面只能有一个字段。

Object QueryDB {

def queryData(sql: String,field: String):

ArrayBuffer[String] = {

//创建ab,用来封装数据

val arr = new ArrayBuffer[string](

//获取连接

val conn = c3pautil.getconnection[

//执行sqL语句

val ps = conn.preparestatement(sql)

val rs = ps.executeouery()

//封装数据

while (rs.next(){

arr.i=(rs.getstring(field))}

c3p0Util.close(conn,ps,rs)

/ /返回结果

arr

通过上面的代码发现,这个方法只能有一个字段:

arr.i=(rs.getstring(field))} 所以只有一个字段的时候可以用它。但是这里面是把所有的数据都查出来。所以不能用这个方法,只能用最原始的方法连接 preparedStatement 去查,查完以后拿到的结果就是一个一个的字段,然后这就解析完了,接下来就是去连接,然后执行并返回的结果。

代码如下:

conn = c3p0Util.getConnection

ps =conn.prepareStatement(sql)

rs = ps.executeQuery()

这里有一个 Analyzerule ,它是一个对象,来看一下这个类型,

点击进入:

这个是用来封装 analyzerule 的。也就是按这个表里的这些字段,里面的 id,flightType,BehaviorType等等全都是查询类的,下面也都是查询类的,然后把对应数据当中的这个表里的字段。把查过来的数据,获取 id 塞给这个对象的 id 里面。查过来的数据库当中的 flight_type 塞到 flightType 里面 ,behavior_type塞到BehaviorType当中:

val analyzeRule = new AnalyzeRule()

analyzeRule.id = rs.getstring( columnLabel = "id")

analyzeRule.flightType = rs.getstring( columnLabel = "flight_type").toInt

analyzeRule.BehaviorType = rs.getstring( columnLabel = "behavior_type”).toInt

analyzeRule.requestMatchExpression m rs.getstring( columnLabel =“requestNatchExpression")

把所有查出来的数据塞给 analyzeRule ,然后再塞到:analyzeRuleList += analyzeRule 里面。

然后转化成 tolist 进行返回:analyzeRuleList.toList,那么到这里就拿到了查询型的规则。

第二步,将规则加载到广播变量。

//将规则加载到广播变量,并循环判断是否需要更新

@volatile var broadcastQueryRules=sc.broadcast(queryRule)

查过来以后就已经加载到广播变量了,但是还没有启动更新。

//环判断是否需要更新(内含多个步骤,此处省略)

val needUpDataAnalyzeRule=redis.get("NeedupDataAnalyzeRule"")

//如果获取的数据是非空的,并且这个值是 true,那么就进行数据的更新操作(在数据库中重新读取数据加载到Redis )

if( !needupDataAnalyzeRule.isEmpty&&

needuUpDataAnalyzeRule.toBoolean){

//重新读取 mysql 的数据

queryRule=AnalyzeRuleDB.queryRule( behaviorType = 0)

bookRule=AnalyzeRuleDB.queryRule( behaviorType =1)

//清空广播变量中的数据

broadcastQueryRules.unpersist()

broadcastBookRules.unpersist()

//重新载入新的过滤数据

broadcastQueryRules=sc.broadcast(queryRule)

broadcastBookRules-sc.broadcast( bookRule)

//更新定华后,将 Redis中的true 改成 false

redis.set("NeedupDataAnalyzeRule" , "false"")}

在 redis 里面要加一个字段叫做 NeedupDataAnalyzeRule,读过来

之后要么是 true 要么是

False(!NeedupDataAnalyzeRule 不为空并且转化成布尔类型为 true 的时候表示需要重新读取规则,那么要重新读取两个规则:查询的和预定,重新清空两个广播变量。清空以后再重新将读过来的规则加载到广播变量里面,然后把 AnalyeRuleNeedUpData 更新掉,以 NeedupDataAnalyzeRule 为准。

这个有了,更新规就有了。现在要把它添加到广播变量里。这个做完

后要添加到我的 redis 里面了,操作如下图:

image.png

这样第二步就做完了,接下来是第三步:解析数据,数据怎么解析?

代码如下:

image.png

//对数据进行解析(在多种解断规则的情况下,确定最终使用哪一个规则进行解析)

val queryRequestData =

AnalyzeRequest.analyzeQueryRequest( requestTypeLabel,

requestMethod,contentType,request,requestBody,travelType, broadcastQueryRules.value)

这里面有几个参数显示红色,那是因为这个参数 requestTypeLabel 实际就是前面解析出来的 requestType ,这里把它删掉即可。request 就是前面解析出来的 url ,把它替换成 requesturl 就解决了。以上就是实际的解析过程。

相关文章
|
1月前
|
人工智能 自然语言处理 前端开发
SpringBoot + 通义千问 + 自定义React组件:支持EventStream数据解析的技术实践
【10月更文挑战第7天】在现代Web开发中,集成多种技术栈以实现复杂的功能需求已成为常态。本文将详细介绍如何使用SpringBoot作为后端框架,结合阿里巴巴的通义千问(一个强大的自然语言处理服务),并通过自定义React组件来支持服务器发送事件(SSE, Server-Sent Events)的EventStream数据解析。这一组合不仅能够实现高效的实时通信,还能利用AI技术提升用户体验。
168 2
|
26天前
|
自然语言处理 数据可视化 前端开发
从数据提取到管理:合合信息的智能文档处理全方位解析【合合信息智能文档处理百宝箱】
合合信息的智能文档处理“百宝箱”涵盖文档解析、向量化模型、测评工具等,解决了复杂文档解析、大模型问答幻觉、文档解析效果评估、知识库搭建、多语言文档翻译等问题。通过可视化解析工具 TextIn ParseX、向量化模型 acge-embedding 和文档解析测评工具 markdown_tester,百宝箱提升了文档处理的效率和精确度,适用于多种文档格式和语言环境,助力企业实现高效的信息管理和业务支持。
3990 5
从数据提取到管理:合合信息的智能文档处理全方位解析【合合信息智能文档处理百宝箱】
|
16天前
|
存储 分布式计算 Java
存算分离与计算向数据移动:深度解析与Java实现
【11月更文挑战第10天】随着大数据时代的到来,数据量的激增给传统的数据处理架构带来了巨大的挑战。传统的“存算一体”架构,即计算资源与存储资源紧密耦合,在处理海量数据时逐渐显露出其局限性。为了应对这些挑战,存算分离(Disaggregated Storage and Compute Architecture)和计算向数据移动(Compute Moves to Data)两种架构应运而生,成为大数据处理领域的热门技术。
38 2
|
22天前
|
JavaScript API 开发工具
<大厂实战场景> ~ Flutter&鸿蒙next 解析后端返回的 HTML 数据详解
本文介绍了如何在 Flutter 中解析后端返回的 HTML 数据。首先解释了 HTML 解析的概念,然后详细介绍了使用 `http` 和 `html` 库的步骤,包括添加依赖、获取 HTML 数据、解析 HTML 内容和在 Flutter UI 中显示解析结果。通过具体的代码示例,展示了如何从 URL 获取 HTML 并提取特定信息,如链接列表。希望本文能帮助你在 Flutter 应用中更好地处理 HTML 数据。
102 1
|
25天前
|
数据采集 机器学习/深度学习 数据挖掘
10种数据预处理中的数据泄露模式解析:识别与避免策略
在机器学习中,数据泄露是一个常见问题,指的是测试数据在数据准备阶段无意中混入训练数据,导致模型在测试集上的表现失真。本文详细探讨了数据预处理步骤中的数据泄露问题,包括缺失值填充、分类编码、数据缩放、离散化和重采样,并提供了具体的代码示例,展示了如何避免数据泄露,确保模型的测试结果可靠。
35 2
|
6天前
|
数据采集 存储 自然语言处理
基于Qwen2.5的大规模ESG数据解析与趋势分析多Agent系统设计
2022年中国上市企业ESG报告数据集,涵盖制造、能源、金融、科技等行业,通过Qwen2.5大模型实现报告自动收集、解析、清洗及可视化生成,支持单/多Agent场景,大幅提升ESG数据分析效率与自动化水平。
|
1月前
|
数据采集 XML 前端开发
Jsoup在Java中:解析京东网站数据
Jsoup在Java中:解析京东网站数据
|
22天前
|
JSON 前端开发 JavaScript
API接口商品详情接口数据解析
商品详情接口通常用于提供特定商品的详细信息,这些信息比商品列表接口中的信息更加详细和全面。以下是一个示例的JSON数据格式,用于表示一个商品详情API接口的响应。这个示例假定API返回一个包含商品详细信息的对象。
|
1月前
|
API
Vue3组件通信全解析:利用props、emit、provide/inject跨层级传递数据,expose与ref实现父子组件方法调用
Vue3组件通信全解析:利用props、emit、provide/inject跨层级传递数据,expose与ref实现父子组件方法调用
453 0
|
1月前
|
前端开发 算法 JavaScript
无界SaaS模式深度解析:算力算法、链接力、数据确权制度
私域电商的无界SaaS模式涉及后端开发、前端开发、数据库设计、API接口、区块链技术、支付和身份验证系统等多个技术领域。本文通过简化框架和示例代码,指导如何将核心功能转化为技术实现,涵盖用户管理、企业店铺管理、数据流量管理等关键环节。

推荐镜像

更多