数据预处理-数据解析-读取规则及加载到广播变量|学习笔记

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云解析DNS-重点域名监控,免费拨测 20万次(价值200元)
简介: 快速学习数据预处理-数据解析-读取规则及加载到广播变量

开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop 框架搭建)第三阶段数据预处理-数据解析-读取规则及加载到广播变量】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址https://developer.aliyun.com/learning/course/671/detail/11661


数据预处理-数据解析-读取规则及加载到广播变量


前面已经把数据解析的思路和目的确定过了,本节课学习到底该怎么实现这个流程。

首先,在 kafka 中把数据读出来,再从数据库当中读取解析的规则,尤其是查询性解析规则。因为我们现在要做的是查询业务数据的解析,所以要把查询的规则读取过来。

数据在 mysql 数据库当中,这涉及到了数据的读取流程。

image.png

先要在数据库当中把规则读到程序里,然后数据有了规则就可以去解析。把规则读取过来并把它放到广播变量里面,涉及到需要从 Redis 当中添加一个是否更新的标识,如果需要更新就走更新的流程,如果不需要更新就直接跳过。

然后用数据库的规则去进行过滤。过滤出对应的数据解析规则。

也就是从数据库当中把规则读过来,然后放到广播变量中,拿出所有查询的解析规则。在数据库当中。查询型用 Behaviortype 0 表示:

package com.air.antispider.stream.dataprocess.constants

//操作标记类别0-查询,1-预定,-1-其他

object BehaviorTypeEnum extends Enumeration{

type BehaviorTypeEnum = Value

val Query = Value(0,"Query")

val Book = value(1,"Book")

val Other = value(-1, "Other")}

那么 behavior_type 代表等于零的有三种情况.

image.png

那么这三种到底该使用哪一种去解析出发地,目的地和起飞时间这些数据?

也就是要过滤出对应的这条数据的解析规则,过滤完了以后就能够确定该用哪个解析,这样就从三个选择变成了一个选择。规则确定了,后面就开始进行解析。

第五步,数据库规则都在 analyzerule 这个表里面。

接下来看一下具体的实现思路。

先看查询性数据解析具体该怎么实现,按照什么顺序来:

查询类数据解析工作的目的就是解析出数据当中的出发地和目的地和起飞时间。具体的做法:

首先解析出 requestType 标记的一个业务场景,也就是国内查询、国际查询,国内预定、国际预定。这些在之前的课程已经解析完了,还要 travelType 也一样。这两个解析完以后,读取数据库内的解析规则到程序里边,这个是第一步。其实就是又读取数据库。第二步就是把这个规则加载到广播变量,第三步是对数据进行解析,一共就这三步。

第一步,读取数据库内的规则到我的程序当中。

image.png

将表内所有的查询规则全部都读取到程序里面。这里虽然查询的就这三条数据,但是里面有很多字段都能用得上。但是 book 的用不了。为了方便,要把这些数据一次性拿过来,包括后面的 depCity,arrCity 这些数据,以及前面的这些指标。解析的时候就要用这些指标从三个里面确定出到底用哪一个?

第一步读取数据库,那读取数据库的数据。

读取数据库的规则:

//读取数据分类规则(四种规则,每种单独读取)到预处理程厅

var RuleMaps=AnalyzeRuleDB.queryRuLeNap()

@volatile "var

broadcastRuleMaps=sc.broadcast(RuleMaps)

读取数据库要先把它放在程序初始化阶段:

//数据预处理的程序

def setupSsc(sc: SparkContext,kafkaPar

//程序初始化阶段

读过来以后,第二步需要将规则加载到广播分量里循环判断是否需要更新。

是否需要更新里也有很多步骤:更新流程在每一次循环里面,

val ssc=new StreamingContext(sc,Seconds(2))

这行代码表示每两秒钟迭代一次。每一次迭代都要去判断是否需要更新,具体流程就类似下面的代码,所以在这里面走第二步。

kafkaValue.foreachRDD( rdd=>{//迭代运行(每2秒运行一次)

//到 redis 读取是否需要更新的标记

val

NeedupDateFilterRule=redis.get("NeedupDateFilterRule")

//判断是否需要更新  若数据不为空并且数据转成 BooLean 为 true 表示需要更新if(!NeedupDateFilterRule.isEmpty && NeedupDateFilterRule.toBoolean){

//若需要更新,那么在数据库中重新读取新的过滤规则到程序中

filterRuleList=AnalyzeRuleDB.queryFilterRule()

//将广播变量清空

broadcastFilterRuleList.unpersist(

//将新的规则重新加载到广播变量

broadcastFilterRuleList= sc.broadcast(filterRuleList)

//将redis内是否需要更新规则的标识改为”false”

redis.set(“NeedupDateFilterRule"",false"))}

第三步,做数据解析。解析工作实际分为这三步:第一步,把它放在数据初始化阶段来执行。代码如下:

//读取数据库内的数据解析规则到预处理程序(将表内的所有查询规则数据全部读取到程序内)

//数据解析规则--查询类

var queryRule=AnalyzeRulgDB.queryRule(0)

@volatile var

broadcastQueryRules=sc.broadcast(queryRule)

//数据解析规则--预定类

var bookRule=AnalyzeRuleDB.queryRule(1)

@volatile var broadcastBookRules=sc.broadcast(bookRule)

查询类为0,预定类为1,AnalyzeRulgDB 方法还没有,说明还没有创建这个方法,创建。

代码如下:

image.png

这段代码就是来实现数据解析工作的。

能够看出里面很多爆红的,这里需要把它引进来。具体怎么做?

看下图的操作,AnalyzeRule 引入

com.air.antispider.streamcommon.bean.AnalyzeRule? Alt+Enter

image.png

同理:connect 引入 java.sql.Connection;Preparedstatement 引

入  java.sql.Preparedstatement;

ResultSet 引入 java.sql.ResultSet;c3p0UTil 引入 com.air.antispider.streamcommon.database.c3p0UTil

这样就不报错了,实现数据解析规则的读取的这个方法需要一个参数。

在 mysql 中解析出去的规则 Behaviortype  ,0是查询,1是预定。

看下面这行代码:

val sql: String = "select * from analyzerule where behavior_type =" + behaviorType

数据库当中有 behaviortype 这个表。现在要做查询型的业务需求。那么就把查询的规则查出来,也就是 behavior_type 等于零的查过来,有下面三种情况。

image.png

先看 behavior_type,sql 语句是:

val sql: String = "select * from analyzerule where

behavior_type =" + behaviorType

它等于一个值,这个值就是 int 类型的参数。

//读取数据库内的数据解析规则到预处理程序(将表内的所有查询规则数据全部读取到程序内)

//数据解析规则询类

var queryRule=AnalyzeRuleDB.queryRule( behaviorType = 0 )

@volatile  var broadcastQueryRules=sc.broadcast(queryRule)

//数揶解析规则--预定类

var bookRule=AnalyzeRuleDB.queryRuLe( behaviorType = 1)

@volatile var broadcastBookRules=sc.broadcast(bookRule)

这里面为什么要传个零进去,因为在这里要做的是查询型的数据,而查询型 behavior 在这里是零,所以就传一个零进去。所以把查询型的数据 behaviortype 等于零就查出来了。

前面不是用 queryDB 里面的 queryData 获取数据?

/ /国内查询规则读取

val nqRuleList=QueryDB.queryData(nqsQL,field)

/ /国际查询规则读取

val iqRuleList= QueryDB.queryData(iqsQL,field)

//国内预定规则读取

val nbRuleList= QueryDB.queryData(nbsQL,field)

//国际预定规则读取

val ibRuleList= QueryDe.queryData( ibsQL,field)

这样给个sql,给个字段就可以获取数据了,为什么要写一个 sql ,然后实现Connection,preparedStatement,ResultSet 为什么要通过这种方式?

因为这个属性,它这里面只能有一个字段。

Object QueryDB {

def queryData(sql: String,field: String):

ArrayBuffer[String] = {

//创建ab,用来封装数据

val arr = new ArrayBuffer[string](

//获取连接

val conn = c3pautil.getconnection[

//执行sqL语句

val ps = conn.preparestatement(sql)

val rs = ps.executeouery()

//封装数据

while (rs.next(){

arr.i=(rs.getstring(field))}

c3p0Util.close(conn,ps,rs)

/ /返回结果

arr

通过上面的代码发现,这个方法只能有一个字段:

arr.i=(rs.getstring(field))} 所以只有一个字段的时候可以用它。但是这里面是把所有的数据都查出来。所以不能用这个方法,只能用最原始的方法连接 preparedStatement 去查,查完以后拿到的结果就是一个一个的字段,然后这就解析完了,接下来就是去连接,然后执行并返回的结果。

代码如下:

conn = c3p0Util.getConnection

ps =conn.prepareStatement(sql)

rs = ps.executeQuery()

这里有一个 Analyzerule ,它是一个对象,来看一下这个类型,

点击进入:

这个是用来封装 analyzerule 的。也就是按这个表里的这些字段,里面的 id,flightType,BehaviorType等等全都是查询类的,下面也都是查询类的,然后把对应数据当中的这个表里的字段。把查过来的数据,获取 id 塞给这个对象的 id 里面。查过来的数据库当中的 flight_type 塞到 flightType 里面 ,behavior_type塞到BehaviorType当中:

val analyzeRule = new AnalyzeRule()

analyzeRule.id = rs.getstring( columnLabel = "id")

analyzeRule.flightType = rs.getstring( columnLabel = "flight_type").toInt

analyzeRule.BehaviorType = rs.getstring( columnLabel = "behavior_type”).toInt

analyzeRule.requestMatchExpression m rs.getstring( columnLabel =“requestNatchExpression")

把所有查出来的数据塞给 analyzeRule ,然后再塞到:analyzeRuleList += analyzeRule 里面。

然后转化成 tolist 进行返回:analyzeRuleList.toList,那么到这里就拿到了查询型的规则。

第二步,将规则加载到广播变量。

//将规则加载到广播变量,并循环判断是否需要更新

@volatile var broadcastQueryRules=sc.broadcast(queryRule)

查过来以后就已经加载到广播变量了,但是还没有启动更新。

//环判断是否需要更新(内含多个步骤,此处省略)

val needUpDataAnalyzeRule=redis.get("NeedupDataAnalyzeRule"")

//如果获取的数据是非空的,并且这个值是 true,那么就进行数据的更新操作(在数据库中重新读取数据加载到Redis )

if( !needupDataAnalyzeRule.isEmpty&&

needuUpDataAnalyzeRule.toBoolean){

//重新读取 mysql 的数据

queryRule=AnalyzeRuleDB.queryRule( behaviorType = 0)

bookRule=AnalyzeRuleDB.queryRule( behaviorType =1)

//清空广播变量中的数据

broadcastQueryRules.unpersist()

broadcastBookRules.unpersist()

//重新载入新的过滤数据

broadcastQueryRules=sc.broadcast(queryRule)

broadcastBookRules-sc.broadcast( bookRule)

//更新定华后,将 Redis中的true 改成 false

redis.set("NeedupDataAnalyzeRule" , "false"")}

在 redis 里面要加一个字段叫做 NeedupDataAnalyzeRule,读过来

之后要么是 true 要么是

False(!NeedupDataAnalyzeRule 不为空并且转化成布尔类型为 true 的时候表示需要重新读取规则,那么要重新读取两个规则:查询的和预定,重新清空两个广播变量。清空以后再重新将读过来的规则加载到广播变量里面,然后把 AnalyeRuleNeedUpData 更新掉,以 NeedupDataAnalyzeRule 为准。

这个有了,更新规就有了。现在要把它添加到广播变量里。这个做完

后要添加到我的 redis 里面了,操作如下图:

image.png

这样第二步就做完了,接下来是第三步:解析数据,数据怎么解析?

代码如下:

image.png

//对数据进行解析(在多种解断规则的情况下,确定最终使用哪一个规则进行解析)

val queryRequestData =

AnalyzeRequest.analyzeQueryRequest( requestTypeLabel,

requestMethod,contentType,request,requestBody,travelType, broadcastQueryRules.value)

这里面有几个参数显示红色,那是因为这个参数 requestTypeLabel 实际就是前面解析出来的 requestType ,这里把它删掉即可。request 就是前面解析出来的 url ,把它替换成 requesturl 就解决了。以上就是实际的解析过程。

相关文章
|
9月前
|
数据采集 JSON 数据可视化
JSON数据解析实战:从嵌套结构到结构化表格
在信息爆炸的时代,从杂乱数据中提取精准知识图谱是数据侦探的挑战。本文以Google Scholar为例,解析嵌套JSON数据,提取文献信息并转换为结构化表格,通过Graphviz制作技术关系图谱,揭示文献间的隐秘联系。代码涵盖代理IP、请求头设置、JSON解析及可视化,提供完整实战案例。
560 4
JSON数据解析实战:从嵌套结构到结构化表格
|
10月前
|
存储 Linux iOS开发
Python入门:2.注释与变量的全面解析
在学习Python编程的过程中,注释和变量是必须掌握的两个基础概念。注释帮助我们理解代码的意图,而变量则是用于存储和操作数据的核心工具。熟练掌握这两者,不仅能提高代码的可读性和维护性,还能为后续学习复杂编程概念打下坚实的基础。
Python入门:2.注释与变量的全面解析
|
9月前
|
JSON 监控 网络协议
Bilibili直播信息流:连接方法与数据解析
本文详细介绍了自行实现B站直播WebSocket连接的完整流程。解析了基于WebSocket的应用层协议结构,涵盖认证包构建、心跳机制维护及数据包解析步骤,为开发者定制直播数据监控提供了完整技术方案。
|
9月前
|
机器学习/深度学习 JSON 算法
淘宝拍立淘按图搜索API接口系列的应用与数据解析
淘宝拍立淘按图搜索API接口是阿里巴巴旗下淘宝平台提供的一项基于图像识别技术的创新服务。以下是对该接口系列的应用与数据解析的详细分析
|
10月前
|
Java API 数据处理
深潜数据海洋:Java文件读写全面解析与实战指南
通过本文的详细解析与实战示例,您可以系统地掌握Java中各种文件读写操作,从基本的读写到高效的NIO操作,再到文件复制、移动和删除。希望这些内容能够帮助您在实际项目中处理文件数据,提高开发效率和代码质量。
249 4
|
10月前
|
数据采集 监控 搜索推荐
深度解析淘宝商品详情API接口:解锁电商数据新维度,驱动业务增长
淘宝商品详情API接口,是淘宝开放平台为第三方开发者提供的一套用于获取淘宝、天猫等电商平台商品详细信息的应用程序接口。该接口涵盖了商品的基本信息(如标题、价格、图片)、属性参数、库存状况、销量评价、物流信息等,是电商企业实现商品管理、市场分析、营销策略制定等功能的得力助手。
|
10月前
|
数据采集 前端开发 API
SurfGen爬虫:解析HTML与提取关键数据
SurfGen爬虫:解析HTML与提取关键数据
|
9月前
|
缓存 监控 搜索推荐
【实战解析】smallredbook.item_get_video API:小红书视频数据获取与电商应用指南
本文介绍小红书官方API——`smallredbook.item_get_video`的功能与使用方法。该接口可获取笔记视频详情,包括无水印直链、封面图、时长、文本描述、标签及互动数据等,并支持电商场景分析。调用需提供`key`、`secret`和`num_iid`参数,返回字段涵盖视频链接、标题、标签及用户信息等。同时,文章提供了电商实战技巧,如竞品监控与个性化推荐,并列出合规注意事项及替代方案对比。最后解答了常见问题,如笔记ID获取与视频链接时效性等。
|
9月前
|
存储 缓存 监控
如何高效爬取天猫商品数据?官方API与非官方接口全解析
本文介绍两种天猫商品数据爬取方案:官方API和非官方接口。官方API合法合规,适合企业长期使用,需申请企业资质;非官方接口适合快速验证需求,但需应对反爬机制。详细内容涵盖开发步骤、Python实现示例、反爬策略、数据解析与存储、注意事项及扩展应用场景。推荐工具链包括Playwright、aiohttp、lxml等。如需进一步帮助,请联系作者。
|
9月前
|
JSON API 数据格式
淘宝商品评论API接口系列的应用与数据解析
在电商平台中,用户评论是了解商品质量、服务水平和用户满意度的重要数据来源。淘宝作为中国最大的电商平台,提供了商品评论API接口,帮助开发者获取和分析用户评价数据。本文将介绍淘宝商品评论API接口系列的作用、使用方法,并通过示例展示如何调用API并解析返回的JSON数据。

推荐镜像

更多
  • DNS