开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop 框架搭建)第五阶段:爬虫识别-读取预处理后的数据到爬虫识别程序】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/673/detail/11688
爬虫识别-读取预处理后的数据到爬虫识别程序
内容介绍:
一、目的、需求与设计思路
二、编写代码
一、目的、需求与设计思路
目的:编写爬虫识别程序的主入口
需求:在之前的 dataprocess 数据处理中,我们已经将数据分为 query 和 book 数据分别存到了不同的 topic 中,只是没有存储数据。分别为:processedQuery 和 processedBook。
预定的数据是获取不到的,如果像预定的数据比如非常敏感的用户的身份证,身份证号码,手机号码,姓名这些东西,如果能拿到,那就实际上就是非常严重的数据泄露啊,也就是隐私泄露。所以重点是反爬虫泄露。其中processedQuery 是用来计算反爬虫指标的,而 processedBook 即预定数据,是用来计算反占座指标的,由于我们无法模拟订单数据,所以我们的重点是处理 processedQuery 从而计算防爬指标。在这个项目中,有一个反爬虫功能,还有一个反占座。
所以,我们需要读取 kafka 中的 processedQuery 数据到爬虫识别程序。
设计思路:
创建 SparkContext 和 SparkStreamingContext
加载 kafka 的配置信息
通过 DirectStream 的方式读取数据
二、编写代码
打开开发工具,数据预处理的代码已经编写好了,接下来就来看一下数据预处理,预处理是在 process 里面做的数据预处理,这里面有个 launch,然后里面这个是数据域出来的程序,而现在要做爬虫识别,那就在这个 compose 里面来进行操作,里面也有一个和上面结构相同的程序。在里面要创建一个 scala 的 object 来做爬虫识别的程序,创建好名字后保存,这就是用来做爬虫识别,用于实现爬虫识别的功能,这个是程序的主入口,爬虫识别的主程序。接下来目的是读取数据到编写程序入口读取数据,读取 kafka 里面的数据到设计的的程序里面来。上述爬虫识别的主入口配置完成。接下来按照步骤,是创建 SparkContext 和 SparkStreamingContext,这个程序在前面预处理阶段的代码编写已经使用过很多次,所以把能用的部分直接复制过来即可。
// 程序主入口
def main(args: Array[String]): Unit=(
//添加日志级别的设置
LoggerLevels.setStreaminqLogLevels()
1当应用被停止的时候,进行如下设置可以保证当前批次执行完之后再停止应用。
System.setProperty("spark.streaming.stopGracefullyOnShutdown","true"
//1、 创建 Spark conf
val conf=new
SparkConf().setAppName("DataProcess").setMaster("local[2]")
.set("spark.metrics.conf.executor.source.jvm.class","org.apache.spark.metrics.source.JvmSource")
//开启集群监控功能
//2、 创建 SparkContext
val sc=new SparkContext(conf)
kafkaParams: JMap[String, String],
topics:Jset[String]
val kafkaparams=Map("bootstrap.servers"->PropertiesUtil.getStringByKey( key="default.brokers",propName="kafkaconfig. val topics=Set(PropertiesUtil.getStringByKey(key="source.nginx.topic",propName="kafkaConfig.properties"))
上述中的设置日志级别、应用停止、创建应用spark conf、spark context、kafka的设置参数、topic上述中的内容基本都能用上,复制过来之后,进行修改。
日志级别不需要改,应用停止不需要改,appname 需要修改,原本是数据预处理的,而现在是爬虫识别,那我就用compose 来进行标记入口,这里面 master 可以不用改,后面这个开启监控功能也不修改,后面也需要做一个爬虫监控,这个监控先开启,再往后不需要改。配置参数 kafka 的集群,配置文件这个不需要改,topic需 要改,因为这个数据推送是第二个阶段,是3-1路径读取数在这里面读,而现在要在路径4里面进行修改。如下图:
生产者推送数据,查询到的数据,给他宽裕的投票,推送完后,再进行读取,这个爬虫识别的程序 topic,读取对处理后的查询数据,这个也是查询的数据特别之处,那么为何存在差异?下图中会发现,处理后的查询数据和推送查询数据是一样的,本质应该读 target query topic,但是现在要把数据预处理的就作为预处理,爬虫识别的就叫做爬虫识别,要进行区分,但是实际上值是一样的,所以这里面读取这个预测以后的这个查询拷贝过来保存。
default.key_serializer_class_config=org.apache.kafka.common.serialization.Stringserializer#一个批次提交数据大小
default.batch_size_config = 32768
#往kafka服务器提交消息问隔时间,0则立即提交不等待
default.linger_ms_config = 10
#消费者
#来自采集服务的原始数据
source.nginx.topic = B2CDATA_COLLECTION3
#处理后的查询数据
source.query.topic = processedQuery
#处理后的预订数据
source.book.topic = processedBook
#生产者
#推送查询数据
target.query.topic = processedQuery
#推送预订数据
target.book.topic = processedBook
Kafka 的配置文件不用改,也就是说数据预处理阶段的4,推送数据推送到了 topic 里面,而他的值超标,而现在爬虫识别要把它读出来,从路径5-1读出来。
//1、 创建 Spark conf
val conf=new
Sparkconf().setAppName("RuleCompute").setMaster("local[2]")
.set("spark.metrics.conf.executor.source.jvm.class", "org.apache.spark.metrics.source.JvmSource")
//开启集群监控功能//2、 创建SparkContext
val sc=new SparkContext(conf)
kafkaparams: JMap[String, String],
topics:JSet[String]
val kafkaParams=Map("bootstrap.servers"->PropertiesUtil.getstringByKey(key="default.brokers",propName="kafkaconfig. val topics=Set(Propertiesutil.getStringByKey( key="source.query.topic", propName="kafkaConfig.properties"))
//数据预处理的程序
val ssc=setupSsc(sc,kafkaParams,topics)
//6、 开启 Streaming 任务+开启循环
ssc.start()
ssc.awaitTermination()
set up ssc.,改成 rule compute ssc,创建一个新的,剪切到后续程序,作为爬虫识别的代码。
//爬虫识别代码
def setupRulecomputessc(sc: sparkcontext, kafkaparams: Map[String, String], topics: Set[String]): StreamingContext=(
//3 、创建 streaming Context
valssc=new StreamingContext(sc,Seconds(2))
//4 读取 kafka 数据
val kafkaDatas= KafkaUtils.createDirectStream[String,String,stringDecoder,StringDecoder](ssc,kafkaparams,topics)
// 获取 vatie 数据
val kafkaValues= kafkaDatas.map(_._2)
SSC
)
里面还没有 ssc,在爬虫识别里面的创建 streaming context,复制过来。再返回 ssc,会发现不报错。
接下来第四步进行读取kafka数据,调用 createdirectstream,调用 kafka 的 utils,补全,把参数传过来,第一个是streaming context,第二个是 params,第三个是 topics。返回的数据类型为 key-value 型。获取一个 value 数据,kafkavalues.foreachRDD(rdd=>)即消费数据。
/爬虫识别代码
defsetupRulecomputessc(sc: SparkContext,kafkaparams: Map[String, String],topics: Set[String]):StreamingContext =
//3 、创建 Streaming Context
val ssc=new Streamingcontext(sc,Seconds(2))
//4 读取 kafka 数据
val kafkaDatas=
Kafkautils.createDirectStream[String,String,StringDecoder,StringDecoder(ssc,kafkaParams,topics)
// 获取 value 数据
val kafkaValues= kafkaDatas.map(_·_2)
//5 消费数据
kafkaValues.foreachRDD(rdd=>rdd.foreach(println))
SSC
想要运行此程序,爬虫读取的数据依赖于爬虫处理后的数据,5-1的数据依赖于4,因此想要识别数据,得从路径4里面有数据进入,所以先把爬虫识别当中预处理的代码运行起来,再执行爬虫预处理,再来执行爬虫识别。
如上页面,在数据读取过程已经进行完毕,只是还未进行封装,爬虫识别过程中的读取就成功了,把俩个任务都停止运行。到此,数据预处理初步完成。