开发者学堂课程【大数据实战项目:反爬虫系统(Lua+Spark+Redis+Hadoop框架搭建):CreateDirectStream 消费数据】学习笔记与课程紧密联系,让用户快速学习知识
课程地址:https://developer.aliyun.com/learning/course/670/detail/11627
CreateDirectStream 消费数据
内容介绍:
一、CreateDirectStream 消费数据的步骤
二、CreateDirectStream 的代码实现
一、CreateDirectStream 消费数据的步骤
目标:掌握CreateDirectStream 消费数据的步骤
1、创建Spark conf
2、创建SparkContext
3、创建Streaming Context
4、读取kafka 内的数据ssc,kafkaParams,topics)
5、消费数据
6、开启 Streaming 任务+开启循环
二、CreateDirectStream 的代码实现
来到开发环境中,打开 ispider 并将其中的 main 关掉,找到test ,右键点击 scala 后,将复制出的CreateDirectStream 新建到Scala 的Object ,输入TestCreateDirectStream。
接下来处理消费数据的整个流程,用CreateDirectStream 的方法来读取并消费。
如下:
1、程序的入口
首先看代码逻辑,要执行该操作,就要创建出一个main 方法。
object TestCreateDirectStream {
//程序的入口
def main(args: Array[string]): Unit = {
//1、创建 spark conf
v
al
conf=new
SparkConf
().
setMaster
("local[2]").setAppName
("TestCreateDirectstream")
//2、创建 SparkContext
val sc=new SparkContext(conf)
//3、创建 streaming Context
val ssc=new StreamingContext(sc,Seconds(2))
//4、读取 kafka 内的数据 ssc,kafkaParams,topics)
Kafka
U
tils.createDirectstream()
//其中 createDirectstream() 会爆红,因为createDirectstream() 中需要很多参数,但实际里面没有参数。
查看缺的参数需要按ctrl ,会出现很多用法。需要用到(其中Class 参数是无用的):
d
ef
createDirectStream
[K, V, KD <: Decoder[K], VD <: Decoder[V]](
jssc: Java
S
treaming
C
ontext,
key
C
lass:
C
lass[K],
value
C
lass: Class[V],
keyDecoderClass: Class[KD],
valueDecoderClass: Class[VD],
kafka
P
arams: JMap[String, String],
topics: JSet[String
]
//实例kafkaParams
v
al
kafka
P
arams
=
Map
("bootstrap.servers
"->"
192.168.100.100:9092,192.168.100.110:9092,192.168.100.120:9
0
92")
//实例 topics
val topics=Set("test
0
1")
//接收数据
val kafkaDatas
=
Kafka
U
tils.createDirect
S
tream
[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
//只获取value 数据
val kafka
V
alue=kafkaDatas.map(_._2)
//5、消费数据
kafka
V
alue.foreachRDD(rdd=>rdd.foreach(println))
//6、开启 streaming 任务+开启循环
ssc.start()
ssc.awaitTermination
()