数据预处理-数据解析-读取规则及加载到广播变量|学习笔记

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
全局流量管理 GTM,标准版 1个月
简介: 快速学习数据预处理-数据解析-读取规则及加载到广播变量

开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop 框架搭建)第三阶段数据预处理-数据解析-读取规则及加载到广播变量】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址https://developer.aliyun.com/learning/course/671/detail/11661


数据预处理-数据解析-读取规则及加载到广播变量


前面已经把数据解析的思路和目的确定过了,本节课学习到底该怎么实现这个流程。

首先,在 kafka 中把数据读出来,再从数据库当中读取解析的规则,尤其是查询性解析规则。因为我们现在要做的是查询业务数据的解析,所以要把查询的规则读取过来。

数据在 mysql 数据库当中,这涉及到了数据的读取流程。

image.png

先要在数据库当中把规则读到程序里,然后数据有了规则就可以去解析。把规则读取过来并把它放到广播变量里面,涉及到需要从 Redis 当中添加一个是否更新的标识,如果需要更新就走更新的流程,如果不需要更新就直接跳过。

然后用数据库的规则去进行过滤。过滤出对应的数据解析规则。

也就是从数据库当中把规则读过来,然后放到广播变量中,拿出所有查询的解析规则。在数据库当中。查询型用 Behaviortype 0 表示:

package com.air.antispider.stream.dataprocess.constants

//操作标记类别0-查询,1-预定,-1-其他

object BehaviorTypeEnum extends Enumeration{

type BehaviorTypeEnum = Value

val Query = Value(0,"Query")

val Book = value(1,"Book")

val Other = value(-1, "Other")}

那么 behavior_type 代表等于零的有三种情况.

image.png

那么这三种到底该使用哪一种去解析出发地,目的地和起飞时间这些数据?

也就是要过滤出对应的这条数据的解析规则,过滤完了以后就能够确定该用哪个解析,这样就从三个选择变成了一个选择。规则确定了,后面就开始进行解析。

第五步,数据库规则都在 analyzerule 这个表里面。

接下来看一下具体的实现思路。

先看查询性数据解析具体该怎么实现,按照什么顺序来:

查询类数据解析工作的目的就是解析出数据当中的出发地和目的地和起飞时间。具体的做法:

首先解析出 requestType 标记的一个业务场景,也就是国内查询、国际查询,国内预定、国际预定。这些在之前的课程已经解析完了,还要 travelType 也一样。这两个解析完以后,读取数据库内的解析规则到程序里边,这个是第一步。其实就是又读取数据库。第二步就是把这个规则加载到广播变量,第三步是对数据进行解析,一共就这三步。

第一步,读取数据库内的规则到我的程序当中。

image.png

将表内所有的查询规则全部都读取到程序里面。这里虽然查询的就这三条数据,但是里面有很多字段都能用得上。但是 book 的用不了。为了方便,要把这些数据一次性拿过来,包括后面的 depCity,arrCity 这些数据,以及前面的这些指标。解析的时候就要用这些指标从三个里面确定出到底用哪一个?

第一步读取数据库,那读取数据库的数据。

读取数据库的规则:

//读取数据分类规则(四种规则,每种单独读取)到预处理程厅

var RuleMaps=AnalyzeRuleDB.queryRuLeNap()

@volatile "var

broadcastRuleMaps=sc.broadcast(RuleMaps)

读取数据库要先把它放在程序初始化阶段:

//数据预处理的程序

def setupSsc(sc: SparkContext,kafkaPar

//程序初始化阶段

读过来以后,第二步需要将规则加载到广播分量里循环判断是否需要更新。

是否需要更新里也有很多步骤:更新流程在每一次循环里面,

val ssc=new StreamingContext(sc,Seconds(2))

这行代码表示每两秒钟迭代一次。每一次迭代都要去判断是否需要更新,具体流程就类似下面的代码,所以在这里面走第二步。

kafkaValue.foreachRDD( rdd=>{//迭代运行(每2秒运行一次)

//到 redis 读取是否需要更新的标记

val

NeedupDateFilterRule=redis.get("NeedupDateFilterRule")

//判断是否需要更新  若数据不为空并且数据转成 BooLean 为 true 表示需要更新if(!NeedupDateFilterRule.isEmpty && NeedupDateFilterRule.toBoolean){

//若需要更新,那么在数据库中重新读取新的过滤规则到程序中

filterRuleList=AnalyzeRuleDB.queryFilterRule()

//将广播变量清空

broadcastFilterRuleList.unpersist(

//将新的规则重新加载到广播变量

broadcastFilterRuleList= sc.broadcast(filterRuleList)

//将redis内是否需要更新规则的标识改为”false”

redis.set(“NeedupDateFilterRule"",false"))}

第三步,做数据解析。解析工作实际分为这三步:第一步,把它放在数据初始化阶段来执行。代码如下:

//读取数据库内的数据解析规则到预处理程序(将表内的所有查询规则数据全部读取到程序内)

//数据解析规则--查询类

var queryRule=AnalyzeRulgDB.queryRule(0)

@volatile var

broadcastQueryRules=sc.broadcast(queryRule)

//数据解析规则--预定类

var bookRule=AnalyzeRuleDB.queryRule(1)

@volatile var broadcastBookRules=sc.broadcast(bookRule)

查询类为0,预定类为1,AnalyzeRulgDB 方法还没有,说明还没有创建这个方法,创建。

代码如下:

image.png

这段代码就是来实现数据解析工作的。

能够看出里面很多爆红的,这里需要把它引进来。具体怎么做?

看下图的操作,AnalyzeRule 引入

com.air.antispider.streamcommon.bean.AnalyzeRule? Alt+Enter

image.png

同理:connect 引入 java.sql.Connection;Preparedstatement 引

入  java.sql.Preparedstatement;

ResultSet 引入 java.sql.ResultSet;c3p0UTil 引入 com.air.antispider.streamcommon.database.c3p0UTil

这样就不报错了,实现数据解析规则的读取的这个方法需要一个参数。

在 mysql 中解析出去的规则 Behaviortype  ,0是查询,1是预定。

看下面这行代码:

val sql: String = "select * from analyzerule where behavior_type =" + behaviorType

数据库当中有 behaviortype 这个表。现在要做查询型的业务需求。那么就把查询的规则查出来,也就是 behavior_type 等于零的查过来,有下面三种情况。

image.png

先看 behavior_type,sql 语句是:

val sql: String = "select * from analyzerule where

behavior_type =" + behaviorType

它等于一个值,这个值就是 int 类型的参数。

//读取数据库内的数据解析规则到预处理程序(将表内的所有查询规则数据全部读取到程序内)

//数据解析规则询类

var queryRule=AnalyzeRuleDB.queryRule( behaviorType = 0 )

@volatile  var broadcastQueryRules=sc.broadcast(queryRule)

//数揶解析规则--预定类

var bookRule=AnalyzeRuleDB.queryRuLe( behaviorType = 1)

@volatile var broadcastBookRules=sc.broadcast(bookRule)

这里面为什么要传个零进去,因为在这里要做的是查询型的数据,而查询型 behavior 在这里是零,所以就传一个零进去。所以把查询型的数据 behaviortype 等于零就查出来了。

前面不是用 queryDB 里面的 queryData 获取数据?

/ /国内查询规则读取

val nqRuleList=QueryDB.queryData(nqsQL,field)

/ /国际查询规则读取

val iqRuleList= QueryDB.queryData(iqsQL,field)

//国内预定规则读取

val nbRuleList= QueryDB.queryData(nbsQL,field)

//国际预定规则读取

val ibRuleList= QueryDe.queryData( ibsQL,field)

这样给个sql,给个字段就可以获取数据了,为什么要写一个 sql ,然后实现Connection,preparedStatement,ResultSet 为什么要通过这种方式?

因为这个属性,它这里面只能有一个字段。

Object QueryDB {

def queryData(sql: String,field: String):

ArrayBuffer[String] = {

//创建ab,用来封装数据

val arr = new ArrayBuffer[string](

//获取连接

val conn = c3pautil.getconnection[

//执行sqL语句

val ps = conn.preparestatement(sql)

val rs = ps.executeouery()

//封装数据

while (rs.next(){

arr.i=(rs.getstring(field))}

c3p0Util.close(conn,ps,rs)

/ /返回结果

arr

通过上面的代码发现,这个方法只能有一个字段:

arr.i=(rs.getstring(field))} 所以只有一个字段的时候可以用它。但是这里面是把所有的数据都查出来。所以不能用这个方法,只能用最原始的方法连接 preparedStatement 去查,查完以后拿到的结果就是一个一个的字段,然后这就解析完了,接下来就是去连接,然后执行并返回的结果。

代码如下:

conn = c3p0Util.getConnection

ps =conn.prepareStatement(sql)

rs = ps.executeQuery()

这里有一个 Analyzerule ,它是一个对象,来看一下这个类型,

点击进入:

这个是用来封装 analyzerule 的。也就是按这个表里的这些字段,里面的 id,flightType,BehaviorType等等全都是查询类的,下面也都是查询类的,然后把对应数据当中的这个表里的字段。把查过来的数据,获取 id 塞给这个对象的 id 里面。查过来的数据库当中的 flight_type 塞到 flightType 里面 ,behavior_type塞到BehaviorType当中:

val analyzeRule = new AnalyzeRule()

analyzeRule.id = rs.getstring( columnLabel = "id")

analyzeRule.flightType = rs.getstring( columnLabel = "flight_type").toInt

analyzeRule.BehaviorType = rs.getstring( columnLabel = "behavior_type”).toInt

analyzeRule.requestMatchExpression m rs.getstring( columnLabel =“requestNatchExpression")

把所有查出来的数据塞给 analyzeRule ,然后再塞到:analyzeRuleList += analyzeRule 里面。

然后转化成 tolist 进行返回:analyzeRuleList.toList,那么到这里就拿到了查询型的规则。

第二步,将规则加载到广播变量。

//将规则加载到广播变量,并循环判断是否需要更新

@volatile var broadcastQueryRules=sc.broadcast(queryRule)

查过来以后就已经加载到广播变量了,但是还没有启动更新。

//环判断是否需要更新(内含多个步骤,此处省略)

val needUpDataAnalyzeRule=redis.get("NeedupDataAnalyzeRule"")

//如果获取的数据是非空的,并且这个值是 true,那么就进行数据的更新操作(在数据库中重新读取数据加载到Redis )

if( !needupDataAnalyzeRule.isEmpty&&

needuUpDataAnalyzeRule.toBoolean){

//重新读取 mysql 的数据

queryRule=AnalyzeRuleDB.queryRule( behaviorType = 0)

bookRule=AnalyzeRuleDB.queryRule( behaviorType =1)

//清空广播变量中的数据

broadcastQueryRules.unpersist()

broadcastBookRules.unpersist()

//重新载入新的过滤数据

broadcastQueryRules=sc.broadcast(queryRule)

broadcastBookRules-sc.broadcast( bookRule)

//更新定华后,将 Redis中的true 改成 false

redis.set("NeedupDataAnalyzeRule" , "false"")}

在 redis 里面要加一个字段叫做 NeedupDataAnalyzeRule,读过来

之后要么是 true 要么是

False(!NeedupDataAnalyzeRule 不为空并且转化成布尔类型为 true 的时候表示需要重新读取规则,那么要重新读取两个规则:查询的和预定,重新清空两个广播变量。清空以后再重新将读过来的规则加载到广播变量里面,然后把 AnalyeRuleNeedUpData 更新掉,以 NeedupDataAnalyzeRule 为准。

这个有了,更新规就有了。现在要把它添加到广播变量里。这个做完

后要添加到我的 redis 里面了,操作如下图:

image.png

这样第二步就做完了,接下来是第三步:解析数据,数据怎么解析?

代码如下:

image.png

//对数据进行解析(在多种解断规则的情况下,确定最终使用哪一个规则进行解析)

val queryRequestData =

AnalyzeRequest.analyzeQueryRequest( requestTypeLabel,

requestMethod,contentType,request,requestBody,travelType, broadcastQueryRules.value)

这里面有几个参数显示红色,那是因为这个参数 requestTypeLabel 实际就是前面解析出来的 requestType ,这里把它删掉即可。request 就是前面解析出来的 url ,把它替换成 requesturl 就解决了。以上就是实际的解析过程。

相关文章
|
14天前
数据解析之xpath 太6了
数据解析之xpath 太6了
|
18天前
|
存储 弹性计算 缓存
阿里云服务器ECS通用型实例规格族特点、适用场景、指标数据解析
阿里云服务器ECS提供了多种通用型实例规格族,每种规格族都针对不同的计算需求、存储性能、网络吞吐量和安全特性进行了优化。以下是对存储增强通用型实例规格族g8ise、通用型实例规格族g8a、通用型实例规格族g8y、存储增强通用型实例规格族g7se、通用型实例规格族g7等所有通用型实例规格族的详细解析,包括它们的核心特点、适用场景、实例规格及具体指标数据,以供参考。
阿里云服务器ECS通用型实例规格族特点、适用场景、指标数据解析
|
10天前
|
消息中间件 canal 关系型数据库
Maxwell:binlog 解析器,轻松同步 MySQL 数据
Maxwell:binlog 解析器,轻松同步 MySQL 数据
82 11
|
2天前
|
前端开发 Python
解析数据的Beautiful Soup 模块(二)
解析数据的Beautiful Soup 模块(二)
10 1
|
16天前
|
数据采集 存储 JavaScript
构建您的第一个Python网络爬虫:抓取、解析与存储数据
【9月更文挑战第24天】在数字时代,数据是新的金矿。本文将引导您使用Python编写一个简单的网络爬虫,从互联网上自动抓取信息。我们将介绍如何使用requests库获取网页内容,BeautifulSoup进行HTML解析,以及如何将数据存储到文件或数据库中。无论您是数据分析师、研究人员还是对编程感兴趣的新手,这篇文章都将为您提供一个实用的入门指南。拿起键盘,让我们开始挖掘互联网的宝藏吧!
|
1月前
|
数据采集 存储 JavaScript
如何使用Cheerio与jsdom解析复杂的HTML结构进行数据提取
在现代网页开发中,复杂的HTML结构给爬虫技术带来挑战。传统的解析库难以应对,而Cheerio和jsdom在Node.js环境下提供了强大工具。本文探讨如何在复杂HTML结构中精确提取数据,结合代理IP、cookie、user-agent设置及多线程技术,提升数据采集的效率和准确性。通过具体示例代码,展示如何使用Cheerio和jsdom解析HTML,并进行数据归类和统计。这种方法适用于处理大量分类数据的爬虫任务,帮助开发者轻松实现高效的数据提取。
如何使用Cheerio与jsdom解析复杂的HTML结构进行数据提取
|
24天前
|
存储 关系型数据库 MySQL
技术解析:MySQL中取最新一条重复数据的方法
以上提供的两种方法都可以有效地从MySQL数据库中提取每个类别最新的重复数据。选择哪种方法取决于具体的使用场景和MySQL版本。子查询加分组的方法兼容性更好,适用于所有版本的MySQL;而窗口函数方法代码更简洁,执行效率可能更高,但需要MySQL 8.0及以上版本。在实际应用中,应根据数据量大小、查询性能需求以及MySQL版本等因素综合考虑,选择最合适的实现方案。
123 6
|
2天前
|
消息中间件 监控 关系型数据库
MySQL数据实时同步到Elasticsearch:技术深度解析与实践分享
在当今的数据驱动时代,实时数据同步成为许多应用系统的核心需求之一。MySQL作为关系型数据库的代表,以其强大的事务处理能力和数据完整性保障,广泛应用于各种业务场景中。然而,随着数据量的增长和查询复杂度的提升,单一依赖MySQL进行高效的数据检索和分析变得日益困难。这时,Elasticsearch(简称ES)以其卓越的搜索性能、灵活的数据模式以及强大的可扩展性,成为处理复杂查询需求的理想选择。本文将深入探讨MySQL数据实时同步到Elasticsearch的技术实现与最佳实践。
17 0
|
2天前
|
XML 数据格式 开发者
解析数据的Beautiful Soup 模块(一)
解析数据的Beautiful Soup 模块(一)
16 0
|
1月前
|
存储 JSON API
Python编程:解析HTTP请求返回的JSON数据
使用Python处理HTTP请求和解析JSON数据既直接又高效。`requests`库的简洁性和强大功能使得发送请求、接收和解析响应变得异常简单。以上步骤和示例提供了一个基础的框架,可以根据你的具体需求进行调整和扩展。通过合适的异常处理,你的代码将更加健壮和可靠,为用户提供更加流畅的体验。
79 0

热门文章

最新文章

推荐镜像

更多