开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop 框架搭建)第五阶段:爬虫识别-main 方法及封装 processData 总结】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/673/detail/11690
爬虫识别-main 方法及封装 processData 总结
内容介绍:
一、main 方法
二、封装 processdata
一、main 方法
前面学习使用了爬虫识别的 main 方法,和爬虫的读取数据,还进行了数据拆分,分装成 process data。接下来进行总结。
第一是 main 方法爬虫识别的程序入口,数据预处理的爬虫识别,具体代码如下:
//爬虫识别程序的主入口
defmain(args: Array[String]): Unit=(
//添加日志级别的设置
LoggerLevels.setStreaminqloqLevels()
/当应用被停止的时候,进行如下设置可以保证当前批次执行完之后再停止应用。
System.setProperty("spark.streaming.stopGracefullyOnShutdown","true
//1、 创建 Spark conf
val conf=new
SparkConf().setAppName("RuleCompute").setMaster("local[2]”)
.set("spark.metrics.conf.executor.source.jvm.class","org.apache.spark.metrics.source.JvmSource")
//开启集群监控功能
//2、 创建 s parkContext
val sc=new Sparkcontext(conf)
kafkaparams: JMap[String, String],
topics: JSet[String]
val kafkaparams=Map("bootstrap.servers"->PropertiesUtil.getStringByKey( key="default.brokers",propName="kafkaConfig. val topics=Set(PropertiesUtil.getstringByKey(key="source.query.topic",propName="kafkaConfig.properties"))
爬虫识别的 rule compute方法:
目标:编写爬虫识别程序的主程序入口代码
写 RuleComput 的驱动类(RuleComputLauncher),在 RuleComput 中步骤如下
1、定义日志级别和批处理完成后任务结束的设置
2、在 main 方法中定义 conf、SparkContext
3、读取 kafka 的参数
4、创建 setupRuleComputSsc 方法
5、创建 streamingcontext
6、创建 kafka 消费的 DirectStream
7、消费数据
8、返回 SSC
定义日志级别和处理完以后再结束任务,出来最后一批以后再结束任务,然后再定义 main 方法的那个 conf 和sparkcontext,然后 kafka 里面的参数,然后创建 setuprulecomputssc,创建 kafka 的数据。然后最后返回一个ssc,是因为在这个 setuprulecomputssc 里面,进行返回,这个 main 方法里面需要接收,需要开启任务和循环。
关键代码如下:
1、定义日志级别和批处理完成后任务结束的设置
LoggerLevels.setStreamingLogLevels()
System.setProperty("spark.streaming.stopGracefu1lyOnShutdown", "true'
2.在 main 方法中定义 conf 和 sparkcontext
val conf=new
SparkConf().setAppName("RuleCompute").setMaster("local[2]")
.set("spark.metrics.conf.executor.source.jvm.class",
"org.apache.spark.metrics.source.JvmSource")
//开启集群监控功能
val sc=new SparkContext(conf)
上面需要注意的是开启集群监控,因为爬虫处理也需要去做一个监控。
3、读取 kafka 的参数
val kafkaParams=Map("bootstrap.servers"-
>PropertiesUtil.getStringByKey("default.brokers","kafkaConfig.properties")) val
topics=Set(Propertiesuti1.getStringByKey("source.query.topic","kafkaConfig.properties"))
4、创建 setupRuleComputSsc 方法
val ssc=setupRuleComputeSsc(sc,kafkaParams,topics)
5、创建 streamingcontext
val ssc=new StreamingContext(sc,Seconds(2))
6、创建 kafka 消费的 DirectStream
val kafkaDatas=
Kafkautils.createDirectStream[String,String,StringDecoder,StringDecoder] (ssc, kafkaParams,topics)
val kafkaValues= kafkaDatas.mas(_._2)
7、消费数据
kafkaValues.foreachRDD(rdd=>rdd.foreach(print1n))
8、返回 ssc
ssc
上述就是 rulecompute 实时计算的程序入口。
二、分装 process data
接下来对数据进行解析,将数据解析分装成 process data。
目标:经过预处理的数据存储在 Kafka 内,多个数据使用“#CS#”分割,后续计算会使用到各个数据,临时切分使用不便。为方便后续使用需将数据切分后封装成 Processedpata。
思路与关键代码
1、获取到 kafka 内的数据后,使用#CS#对数据进行切分。
简单来说就是拿到一批数据后,用#cs#进行切分,多复制过来一些数据,再将无关紧要的删除,遍历一个数据后,把中间处理的部分删除,在进行适当的添加。拿到数据之后,进行 mappartition,再调用 main 方法。上述即切分的过程,处理后的代码:
kafkaValues.mapPartitions partitionsIterator
partitionsIterator.map sourceline =>//分割数据
val dataArray = sourceline.split("#CS#",-1)...
2、将切分后的数据封装成 ProcessedData 并返回。
ProcessedData("",requestMethod, request,
remoteAddr, httpUserAgent, timeIso8601,
serverAddr,highFrqIPGroup,
requestType, travelType, requestParams,
cookieValue_JSESSIONID, cookieValue_USERID,
queryRequestData, bookRequestData, httpReferrer)
上述就是分装数据的全过程,分装数据的整体思路较为简单,就是将数据一个个进行拆分,然后分装成processdata。
以上就是数据的封装以及 main 方法实现的过程。