项目
sprak-stream与kafak整合wordCount
在IDEA上接收kafka传来的数据,并进行单词统计
linux端打开kafka
//1.先打开zookeeper(3台) zkServer.sh start //2.在打开kafka(3台) bin/kafka-server-start.sh config/server.properties & //3.创建生产者 bin/kafka-console-producer.sh --broker-list hou-01:9092 --topic wc //4.控制台输入任意单词
IDEA添加依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>${spark.version}</version> </dependency>
1.0版本单词计数
package day08 import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Milliseconds, StreamingContext} /* 需求:kafka消费数据到sparkStreaming计算 */ object KafkaWordCount { def main(args: Array[String]): Unit = { //1.创建StreamingContext val conf: SparkConf = new SparkConf().setAppName("kafkaWordCount").setMaster("local[2]") val ssc: StreamingContext = new StreamingContext(conf,Milliseconds(2000)) //2.接入kafka数据源(如何访问kafka集群?zookeeper) val zkQuorm: String = "192.168.64.111,192.168.64.112,192.168.64.113" //访问组 val groupID = "g1" //访问主题 val topic: Map[String, Int] = Map[String,Int]("wc"->1) //创建Dstream val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils .createStream(ssc,zkQuorm,groupID,topic) //3.处理数据 val data: DStream[String] = kafkaStream.map(_._2) //4.启动streaming程序 val r: DStream[(String, Int)] = data.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) r.print() ssc.start() //5.关闭资源 ssc.awaitTermination() } }
结果
2.0版本单词计数
将历史记录保存下来,显示出来,主要使用dataFunc
package day08 import org.apache.spark.{HashPartitioner, SparkConf} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Milliseconds, StreamingContext} object StatusKafkaWordCount { //保持历史状态 wc 单词,次数 聚合的key //第一个类型:单词,第二个类型:在每一个分区中出现的次数累加的结果 //第三个类型:是以前的结果 val updateFunc = (iter:Iterator[(String,Seq[Int],Option[Int])]) => { //总的次数= 当前出现的次数 + 以前返回的结果 iter.map(t => (t._1, t._2.sum + t._3.getOrElse(0))) } def main(args: Array[String]): Unit = { //1.创建程序入口 val conf: SparkConf = new SparkConf().setAppName("StateKafkaWC").setMaster("local[2]") val ssc: StreamingContext = new StreamingContext(conf,Milliseconds(2000)) //2.需要累加历史数据 checkpoints ssc.checkpoint("hdfs://192.168.64.111:9000/ck") //3.接入kafka数据源 val zkQuorm: String = "192.168.64.111,192.168.64.112,192.168.64.113" //访问组 val groupID = "g1" //访问主题 val topic: Map[String, Int] = Map[String,Int]("wc"->1) //创建Dstream val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils .createStream(ssc,zkQuorm,groupID,topic) //4.处理数据 val data: DStream[String] = kafkaStream.map(_._2) //5.加入历史数据计算 val r: DStream[(String, Int)] = data.flatMap(_.split(" ")).map((_, 1)) //参数1:自定义业务函数 参数2:分区器设置 参数3:是否使用 .updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true) //6.打印 r.print() //7.启动程序 ssc.start() //8.关闭资源 ssc.awaitTermination() } }
结果