开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop框架搭建):预处理及识别代码架构介绍】学习笔记与课程紧密联系,让用户快速学习知识
课程地址:https://developer.aliyun.com/learning/course/670/detail/11630
数据预处理-预处理程序入口优化
数据预处理-预处理程序入口优化
1、数据预处理程序的主程序
object Data
P
rocessLauncher {
//程序主入口
def main(args: Array[String]): Unit ={
//添加日志级别设置
LoggerLevels.set
S
treamingLogLevels()
//当应用被停止的时候,进行如下设置可以保证当前批次执行完之后再停止应用。
System.set
P
roperty("spark.streaming.stopGracefully
O
n
S
hutdown","true")
//1、创建 Spark conf
v
al
conf=new
SparkConf
().setAppName
("
DataProcess
")
.setMaster(
“
local[2]
”
)
//开启日志监控功能
.set("spark.metrics.conf.executor.source.jvm.class",
“
org.apache.spark.metrics.source.JvmSource")
//开启集群监控功能
//2、创建SparkContext
val sc=new SparkContext(conf)
//3、创建streaming Context
val ssc=new StreamingContext(sc,Seconds(2))
//4、读取kafka 内的数据ssc,kafkaParams,topics)
//
jssc: Java
S
treaming
C
ontext,
//
kafka
P
arams: JMap[String, String],
//
topics: JSet[String
]
v
al
kafka
P
arams
=
Map
("bootstrap.servers
"->
P
ropertiesUtil.getstringBykey(key="default.broker
s
”
,propName = "kafkaConfig.properties"))
valtopics=Set(PropertiesUtil.getStringByKey(key=
”
source.nginx.topic
”
,propName=
”
kafkaConfig.properties
”
))
//接收kafka 的数据(key,value)
Kafka
U
tils.createDirect
S
tream
[String,String,
StringDecoder,StringDecoder](ssc,kafkaParams,topics)
//真正的数据
val kafkaValue=kafkaData.map(_._2)
//5、消费数据
kafka
V
alue.foreachRDD(rdd=>rdd.foreach(println))
//数据预处理的程序
Val ssc=setupSsc(sc,kafkaParams,topics)
//6、开启 streaming 任务+开启循环
ssc.start()
ssc.awaitTermination
()
}
}
2、数据预处理的程序
defsetupSsc(s
s
c:Spark
C
ontext,kafka
P
arams:Map[String, String], topics: set[String]):
StreamingContext
=
{
//3、创建 streaming Context
val ssc=new StreamingContext(sc,Seconds(2))
//4、读取kafka 内的数据ssc,kafka
P
arams,topics)
//
jssc: Java
S
treaming
C
ontext,
//
kafka
P
arams: JMap[String, String],
//
topics: JSet[String
]
v
al
kafka
P
arams
=
Map
("bootstrap.servers
"->
P
ropertiesUtil.getstringBykey(key="default.broker
s
”
,propName = "kafkaConfig.properties"))
valtopics=Set(PropertiesUtil.getStringByKey(key
=
”
source.nginx.topic
”
,propName=
”
kafkaConfig.properties
”
))
//接收kafka 的数据(key,value)
Kafka
U
tils.createDirect
S
tream
[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
//真正的数据
val kafkaValue=kafkaData.map(_._2)
//5、消费数据
kafka
V
alue.foreachRDD(rdd=>rdd.foreach(println))
Ssc
}
将以上代码进行运行。