数据预处理-数据解析-读取规则及加载到广播变量|学习笔记

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 快速学习数据预处理-数据解析-读取规则及加载到广播变量

开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop 框架搭建)第三阶段数据预处理-数据解析-读取规则及加载到广播变量】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址https://developer.aliyun.com/learning/course/671/detail/11661


数据预处理-数据解析-读取规则及加载到广播变量


前面已经把数据解析的思路和目的确定过了,本节课学习到底该怎么实现这个流程。

首先,在 kafka 中把数据读出来,再从数据库当中读取解析的规则,尤其是查询性解析规则。因为我们现在要做的是查询业务数据的解析,所以要把查询的规则读取过来。

数据在 mysql 数据库当中,这涉及到了数据的读取流程。

image.png

先要在数据库当中把规则读到程序里,然后数据有了规则就可以去解析。把规则读取过来并把它放到广播变量里面,涉及到需要从 Redis 当中添加一个是否更新的标识,如果需要更新就走更新的流程,如果不需要更新就直接跳过。

然后用数据库的规则去进行过滤。过滤出对应的数据解析规则。

也就是从数据库当中把规则读过来,然后放到广播变量中,拿出所有查询的解析规则。在数据库当中。查询型用 Behaviortype 0 表示:

package com.air.antispider.stream.dataprocess.constants

//操作标记类别0-查询,1-预定,-1-其他

object BehaviorTypeEnum extends Enumeration{

type BehaviorTypeEnum = Value

val Query = Value(0,"Query")

val Book = value(1,"Book")

val Other = value(-1, "Other")}

那么 behavior_type 代表等于零的有三种情况.

image.png

那么这三种到底该使用哪一种去解析出发地,目的地和起飞时间这些数据?

也就是要过滤出对应的这条数据的解析规则,过滤完了以后就能够确定该用哪个解析,这样就从三个选择变成了一个选择。规则确定了,后面就开始进行解析。

第五步,数据库规则都在 analyzerule 这个表里面。

接下来看一下具体的实现思路。

先看查询性数据解析具体该怎么实现,按照什么顺序来:

查询类数据解析工作的目的就是解析出数据当中的出发地和目的地和起飞时间。具体的做法:

首先解析出 requestType 标记的一个业务场景,也就是国内查询、国际查询,国内预定、国际预定。这些在之前的课程已经解析完了,还要 travelType 也一样。这两个解析完以后,读取数据库内的解析规则到程序里边,这个是第一步。其实就是又读取数据库。第二步就是把这个规则加载到广播变量,第三步是对数据进行解析,一共就这三步。

第一步,读取数据库内的规则到我的程序当中。

image.png

将表内所有的查询规则全部都读取到程序里面。这里虽然查询的就这三条数据,但是里面有很多字段都能用得上。但是 book 的用不了。为了方便,要把这些数据一次性拿过来,包括后面的 depCity,arrCity 这些数据,以及前面的这些指标。解析的时候就要用这些指标从三个里面确定出到底用哪一个?

第一步读取数据库,那读取数据库的数据。

读取数据库的规则:

//读取数据分类规则(四种规则,每种单独读取)到预处理程厅

var RuleMaps=AnalyzeRuleDB.queryRuLeNap()

@volatile "var

broadcastRuleMaps=sc.broadcast(RuleMaps)

读取数据库要先把它放在程序初始化阶段:

//数据预处理的程序

def setupSsc(sc: SparkContext,kafkaPar

//程序初始化阶段

读过来以后,第二步需要将规则加载到广播分量里循环判断是否需要更新。

是否需要更新里也有很多步骤:更新流程在每一次循环里面,

val ssc=new StreamingContext(sc,Seconds(2))

这行代码表示每两秒钟迭代一次。每一次迭代都要去判断是否需要更新,具体流程就类似下面的代码,所以在这里面走第二步。

kafkaValue.foreachRDD( rdd=>{//迭代运行(每2秒运行一次)

//到 redis 读取是否需要更新的标记

val

NeedupDateFilterRule=redis.get("NeedupDateFilterRule")

//判断是否需要更新  若数据不为空并且数据转成 BooLean 为 true 表示需要更新if(!NeedupDateFilterRule.isEmpty && NeedupDateFilterRule.toBoolean){

//若需要更新,那么在数据库中重新读取新的过滤规则到程序中

filterRuleList=AnalyzeRuleDB.queryFilterRule()

//将广播变量清空

broadcastFilterRuleList.unpersist(

//将新的规则重新加载到广播变量

broadcastFilterRuleList= sc.broadcast(filterRuleList)

//将redis内是否需要更新规则的标识改为”false”

redis.set(“NeedupDateFilterRule"",false"))}

第三步,做数据解析。解析工作实际分为这三步:第一步,把它放在数据初始化阶段来执行。代码如下:

//读取数据库内的数据解析规则到预处理程序(将表内的所有查询规则数据全部读取到程序内)

//数据解析规则--查询类

var queryRule=AnalyzeRulgDB.queryRule(0)

@volatile var

broadcastQueryRules=sc.broadcast(queryRule)

//数据解析规则--预定类

var bookRule=AnalyzeRuleDB.queryRule(1)

@volatile var broadcastBookRules=sc.broadcast(bookRule)

查询类为0,预定类为1,AnalyzeRulgDB 方法还没有,说明还没有创建这个方法,创建。

代码如下:

image.png

这段代码就是来实现数据解析工作的。

能够看出里面很多爆红的,这里需要把它引进来。具体怎么做?

看下图的操作,AnalyzeRule 引入

com.air.antispider.streamcommon.bean.AnalyzeRule? Alt+Enter

image.png

同理:connect 引入 java.sql.Connection;Preparedstatement 引

入  java.sql.Preparedstatement;

ResultSet 引入 java.sql.ResultSet;c3p0UTil 引入 com.air.antispider.streamcommon.database.c3p0UTil

这样就不报错了,实现数据解析规则的读取的这个方法需要一个参数。

在 mysql 中解析出去的规则 Behaviortype  ,0是查询,1是预定。

看下面这行代码:

val sql: String = "select * from analyzerule where behavior_type =" + behaviorType

数据库当中有 behaviortype 这个表。现在要做查询型的业务需求。那么就把查询的规则查出来,也就是 behavior_type 等于零的查过来,有下面三种情况。

image.png

先看 behavior_type,sql 语句是:

val sql: String = "select * from analyzerule where

behavior_type =" + behaviorType

它等于一个值,这个值就是 int 类型的参数。

//读取数据库内的数据解析规则到预处理程序(将表内的所有查询规则数据全部读取到程序内)

//数据解析规则询类

var queryRule=AnalyzeRuleDB.queryRule( behaviorType = 0 )

@volatile  var broadcastQueryRules=sc.broadcast(queryRule)

//数揶解析规则--预定类

var bookRule=AnalyzeRuleDB.queryRuLe( behaviorType = 1)

@volatile var broadcastBookRules=sc.broadcast(bookRule)

这里面为什么要传个零进去,因为在这里要做的是查询型的数据,而查询型 behavior 在这里是零,所以就传一个零进去。所以把查询型的数据 behaviortype 等于零就查出来了。

前面不是用 queryDB 里面的 queryData 获取数据?

/ /国内查询规则读取

val nqRuleList=QueryDB.queryData(nqsQL,field)

/ /国际查询规则读取

val iqRuleList= QueryDB.queryData(iqsQL,field)

//国内预定规则读取

val nbRuleList= QueryDB.queryData(nbsQL,field)

//国际预定规则读取

val ibRuleList= QueryDe.queryData( ibsQL,field)

这样给个sql,给个字段就可以获取数据了,为什么要写一个 sql ,然后实现Connection,preparedStatement,ResultSet 为什么要通过这种方式?

因为这个属性,它这里面只能有一个字段。

Object QueryDB {

def queryData(sql: String,field: String):

ArrayBuffer[String] = {

//创建ab,用来封装数据

val arr = new ArrayBuffer[string](

//获取连接

val conn = c3pautil.getconnection[

//执行sqL语句

val ps = conn.preparestatement(sql)

val rs = ps.executeouery()

//封装数据

while (rs.next(){

arr.i=(rs.getstring(field))}

c3p0Util.close(conn,ps,rs)

/ /返回结果

arr

通过上面的代码发现,这个方法只能有一个字段:

arr.i=(rs.getstring(field))} 所以只有一个字段的时候可以用它。但是这里面是把所有的数据都查出来。所以不能用这个方法,只能用最原始的方法连接 preparedStatement 去查,查完以后拿到的结果就是一个一个的字段,然后这就解析完了,接下来就是去连接,然后执行并返回的结果。

代码如下:

conn = c3p0Util.getConnection

ps =conn.prepareStatement(sql)

rs = ps.executeQuery()

这里有一个 Analyzerule ,它是一个对象,来看一下这个类型,

点击进入:

这个是用来封装 analyzerule 的。也就是按这个表里的这些字段,里面的 id,flightType,BehaviorType等等全都是查询类的,下面也都是查询类的,然后把对应数据当中的这个表里的字段。把查过来的数据,获取 id 塞给这个对象的 id 里面。查过来的数据库当中的 flight_type 塞到 flightType 里面 ,behavior_type塞到BehaviorType当中:

val analyzeRule = new AnalyzeRule()

analyzeRule.id = rs.getstring( columnLabel = "id")

analyzeRule.flightType = rs.getstring( columnLabel = "flight_type").toInt

analyzeRule.BehaviorType = rs.getstring( columnLabel = "behavior_type”).toInt

analyzeRule.requestMatchExpression m rs.getstring( columnLabel =“requestNatchExpression")

把所有查出来的数据塞给 analyzeRule ,然后再塞到:analyzeRuleList += analyzeRule 里面。

然后转化成 tolist 进行返回:analyzeRuleList.toList,那么到这里就拿到了查询型的规则。

第二步,将规则加载到广播变量。

//将规则加载到广播变量,并循环判断是否需要更新

@volatile var broadcastQueryRules=sc.broadcast(queryRule)

查过来以后就已经加载到广播变量了,但是还没有启动更新。

//环判断是否需要更新(内含多个步骤,此处省略)

val needUpDataAnalyzeRule=redis.get("NeedupDataAnalyzeRule"")

//如果获取的数据是非空的,并且这个值是 true,那么就进行数据的更新操作(在数据库中重新读取数据加载到Redis )

if( !needupDataAnalyzeRule.isEmpty&&

needuUpDataAnalyzeRule.toBoolean){

//重新读取 mysql 的数据

queryRule=AnalyzeRuleDB.queryRule( behaviorType = 0)

bookRule=AnalyzeRuleDB.queryRule( behaviorType =1)

//清空广播变量中的数据

broadcastQueryRules.unpersist()

broadcastBookRules.unpersist()

//重新载入新的过滤数据

broadcastQueryRules=sc.broadcast(queryRule)

broadcastBookRules-sc.broadcast( bookRule)

//更新定华后,将 Redis中的true 改成 false

redis.set("NeedupDataAnalyzeRule" , "false"")}

在 redis 里面要加一个字段叫做 NeedupDataAnalyzeRule,读过来

之后要么是 true 要么是

False(!NeedupDataAnalyzeRule 不为空并且转化成布尔类型为 true 的时候表示需要重新读取规则,那么要重新读取两个规则:查询的和预定,重新清空两个广播变量。清空以后再重新将读过来的规则加载到广播变量里面,然后把 AnalyeRuleNeedUpData 更新掉,以 NeedupDataAnalyzeRule 为准。

这个有了,更新规就有了。现在要把它添加到广播变量里。这个做完

后要添加到我的 redis 里面了,操作如下图:

image.png

这样第二步就做完了,接下来是第三步:解析数据,数据怎么解析?

代码如下:

image.png

//对数据进行解析(在多种解断规则的情况下,确定最终使用哪一个规则进行解析)

val queryRequestData =

AnalyzeRequest.analyzeQueryRequest( requestTypeLabel,

requestMethod,contentType,request,requestBody,travelType, broadcastQueryRules.value)

这里面有几个参数显示红色,那是因为这个参数 requestTypeLabel 实际就是前面解析出来的 requestType ,这里把它删掉即可。request 就是前面解析出来的 url ,把它替换成 requesturl 就解决了。以上就是实际的解析过程。

相关文章
|
2月前
|
消息中间件 存储 缓存
十万订单每秒热点数据架构优化实践深度解析
【11月更文挑战第20天】随着互联网技术的飞速发展,电子商务平台在高峰时段需要处理海量订单,这对系统的性能、稳定性和扩展性提出了极高的要求。尤其是在“双十一”、“618”等大型促销活动中,每秒需要处理数万甚至数十万笔订单,这对系统的热点数据处理能力构成了严峻挑战。本文将深入探讨如何优化架构以应对每秒十万订单级别的热点数据处理,从历史背景、功能点、业务场景、底层原理以及使用Java模拟示例等多个维度进行剖析。
75 8
|
8天前
|
JSON 前端开发 搜索推荐
关于商品详情 API 接口 JSON 格式返回数据解析的示例
本文介绍商品详情API接口返回的JSON数据解析。最外层为`product`对象,包含商品基本信息(如id、name、price)、分类信息(category)、图片(images)、属性(attributes)、用户评价(reviews)、库存(stock)和卖家信息(seller)。每个字段详细描述了商品的不同方面,帮助开发者准确提取和展示数据。具体结构和字段含义需结合实际业务需求和API文档理解。
|
1天前
|
JSON 缓存 API
解析电商商品详情API接口系列,json数据示例参考
电商商品详情API接口是电商平台的重要组成部分,提供了商品的详细信息,支持用户进行商品浏览和购买决策。通过合理的API设计和优化,可以提升系统性能和用户体验。希望本文的解析和示例能够为开发者提供参考,帮助构建高效、可靠的电商系统。
20 12
|
6天前
|
存储 分布式计算 Hadoop
基于Java的Hadoop文件处理系统:高效分布式数据解析与存储
本文介绍了如何借鉴Hadoop的设计思想,使用Java实现其核心功能MapReduce,解决海量数据处理问题。通过类比图书馆管理系统,详细解释了Hadoop的两大组件:HDFS(分布式文件系统)和MapReduce(分布式计算模型)。具体实现了单词统计任务,并扩展支持CSV和JSON格式的数据解析。为了提升性能,引入了Combiner减少中间数据传输,以及自定义Partitioner解决数据倾斜问题。最后总结了Hadoop在大数据处理中的重要性,鼓励Java开发者学习Hadoop以拓展技术边界。
31 7
|
2月前
|
数据采集 自然语言处理 搜索推荐
基于qwen2.5的长文本解析、数据预测与趋势分析、代码生成能力赋能esg报告分析
Qwen2.5是一款强大的生成式预训练语言模型,擅长自然语言理解和生成,支持长文本解析、数据预测、代码生成等复杂任务。Qwen-Long作为其变体,专为长上下文场景优化,适用于大型文档处理、知识图谱构建等。Qwen2.5在ESG报告解析、多Agent协作、数学模型生成等方面表现出色,提供灵活且高效的解决方案。
262 49
|
1月前
|
XML JSON JavaScript
HttpGet 请求的响应处理:获取和解析数据
HttpGet 请求的响应处理:获取和解析数据
|
2月前
|
SQL 存储 Oracle
南大通用GBase 8s数据库游标变量解析:提升数据库操作效率
南大通用GBase 8s 数据库游标变量解析:提升数据库操作效率
|
2月前
|
存储 分布式计算 Java
存算分离与计算向数据移动:深度解析与Java实现
【11月更文挑战第10天】随着大数据时代的到来,数据量的激增给传统的数据处理架构带来了巨大的挑战。传统的“存算一体”架构,即计算资源与存储资源紧密耦合,在处理海量数据时逐渐显露出其局限性。为了应对这些挑战,存算分离(Disaggregated Storage and Compute Architecture)和计算向数据移动(Compute Moves to Data)两种架构应运而生,成为大数据处理领域的热门技术。
88 2
|
2月前
|
JavaScript API 开发工具
<大厂实战场景> ~ Flutter&鸿蒙next 解析后端返回的 HTML 数据详解
本文介绍了如何在 Flutter 中解析后端返回的 HTML 数据。首先解释了 HTML 解析的概念,然后详细介绍了使用 `http` 和 `html` 库的步骤,包括添加依赖、获取 HTML 数据、解析 HTML 内容和在 Flutter UI 中显示解析结果。通过具体的代码示例,展示了如何从 URL 获取 HTML 并提取特定信息,如链接列表。希望本文能帮助你在 Flutter 应用中更好地处理 HTML 数据。
146 1
|
2月前
|
数据采集 存储 自然语言处理
基于Qwen2.5的大规模ESG数据解析与趋势分析多Agent系统设计
2022年中国上市企业ESG报告数据集,涵盖制造、能源、金融、科技等行业,通过Qwen2.5大模型实现报告自动收集、解析、清洗及可视化生成,支持单/多Agent场景,大幅提升ESG数据分析效率与自动化水平。
155 0

热门文章

最新文章

推荐镜像

更多