Spark-stream基础---sparkStreaming和Kafka整合wordCount单词计数

简介: sprak-stream与kafak整合wordCount在IDEA上接收kafka传来的数据,并进行单词统计


项目

sprak-stream与kafak整合wordCount

在IDEA上接收kafka传来的数据,并进行单词统计

linux端打开kafka

//1.先打开zookeeper(3台)
zkServer.sh start 
//2.在打开kafka(3台)
 bin/kafka-server-start.sh config/server.properties &
//3.创建生产者
bin/kafka-console-producer.sh --broker-list hou-01:9092 --topic wc
//4.控制台输入任意单词

IDEA添加依赖

 

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>

1.0版本单词计数

package day08
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
/*
需求:kafka消费数据到sparkStreaming计算
 */
object KafkaWordCount {
  def main(args: Array[String]): Unit = {
    //1.创建StreamingContext
    val conf: SparkConf = new SparkConf().setAppName("kafkaWordCount").setMaster("local[2]")
    val ssc: StreamingContext = new StreamingContext(conf,Milliseconds(2000))
    //2.接入kafka数据源(如何访问kafka集群?zookeeper)
    val zkQuorm: String = "192.168.64.111,192.168.64.112,192.168.64.113"
    //访问组
    val groupID = "g1"
    //访问主题
    val topic: Map[String, Int] = Map[String,Int]("wc"->1)
    //创建Dstream
    val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils
      .createStream(ssc,zkQuorm,groupID,topic)
    //3.处理数据
    val data: DStream[String] = kafkaStream.map(_._2)
    //4.启动streaming程序
    val r: DStream[(String, Int)] = data.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
    r.print()
    ssc.start()
    //5.关闭资源
    ssc.awaitTermination()
  }
}

结果

image.png

2.0版本单词计数

将历史记录保存下来,显示出来,主要使用dataFunc

package day08
import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
object StatusKafkaWordCount {
  //保持历史状态 wc 单词,次数 聚合的key
  //第一个类型:单词,第二个类型:在每一个分区中出现的次数累加的结果
  //第三个类型:是以前的结果
  val updateFunc = (iter:Iterator[(String,Seq[Int],Option[Int])]) => {
    //总的次数= 当前出现的次数 + 以前返回的结果
    iter.map(t => (t._1, t._2.sum + t._3.getOrElse(0)))
  }
  def main(args: Array[String]): Unit = {
    //1.创建程序入口
    val conf: SparkConf = new SparkConf().setAppName("StateKafkaWC").setMaster("local[2]")
    val ssc: StreamingContext = new StreamingContext(conf,Milliseconds(2000))
    //2.需要累加历史数据 checkpoints
    ssc.checkpoint("hdfs://192.168.64.111:9000/ck")
    //3.接入kafka数据源
    val zkQuorm: String = "192.168.64.111,192.168.64.112,192.168.64.113"
    //访问组
    val groupID = "g1"
    //访问主题
    val topic: Map[String, Int] = Map[String,Int]("wc"->1)
    //创建Dstream
    val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils
      .createStream(ssc,zkQuorm,groupID,topic)
    //4.处理数据
    val data: DStream[String] = kafkaStream.map(_._2)
    //5.加入历史数据计算
    val r: DStream[(String, Int)] = data.flatMap(_.split(" ")).map((_, 1))
      //参数1:自定义业务函数 参数2:分区器设置 参数3:是否使用
      .updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
    //6.打印
    r.print()
    //7.启动程序
    ssc.start()
    //8.关闭资源
    ssc.awaitTermination()
  }
}

结果

image.png

相关文章
|
25天前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
34 0
|
25天前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
68 0
|
26天前
|
分布式计算 大数据 Java
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
17 1
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
|
25天前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
50 0
|
5月前
|
分布式计算 资源调度 Java
Scala+Spark+Hadoop+IDEA实现WordCount单词计数,上传并执行任务(简单实例-下)
Scala+Spark+Hadoop+IDEA实现WordCount单词计数,上传并执行任务(简单实例-下)
52 0
|
5月前
|
分布式计算 Hadoop Scala
Scala +Spark+Hadoop+Zookeeper+IDEA实现WordCount单词计数(简单实例-上)
Scala +Spark+Hadoop+Zookeeper+IDEA实现WordCount单词计数(简单实例-上)
45 0
|
5月前
|
消息中间件 分布式计算 关系型数据库
使用Apache Spark从MySQL到Kafka再到HDFS的数据转移
使用Apache Spark从MySQL到Kafka再到HDFS的数据转移
|
5月前
|
消息中间件 分布式计算 Kafka
利用Spark将Kafka数据流写入HDFS
利用Spark将Kafka数据流写入HDFS
|
消息中间件 分布式计算 Kafka
Sparkstreaming读取Kafka消息再结合SparkSQL,将结果保存到HBase
环境为CDH5.8,开发工具为IDEA,大数据目前最新的API,送给大家避免踩坑!!
10862 0
|
3天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
19 2
ClickHouse与大数据生态集成:Spark & Flink 实战