Spark修炼之道(进阶篇)——Spark入门到精通:第十六节 Spark Streaming与Kafka

简介: 作者:周志湖 主要内容 Spark Streaming与Kafka版的WordCount示例(一) Spark Streaming与Kafka版的WordCount示例(二) 1. Spark Streaming与Kafka版本的WordCount示例 (一) 启动kafka集群 root@sparkslave02:/hadoopLearning/kafka_2
+关注继续查看

作者:周志湖

主要内容

  1. Spark Streaming与Kafka版的WordCount示例(一)
  2. Spark Streaming与Kafka版的WordCount示例(二)

1. Spark Streaming与Kafka版本的WordCount示例 (一)

  1. 启动kafka集群
root@sparkslave02:/hadoopLearning/kafka_2.10-0.8.2.1# bin/kafka-server-start.sh config/server.properties 
root@sparkslave01:/hadoopLearning/kafka_2.10-0.8.2.1# bin/kafka-server-start.sh config/server.properties 
root@sparkmaster:/hadoopLearning/kafka_2.10-0.8.2.1# bin/kafka-server-start.sh config/server.properties 

向kafka集群发送消息

root@sparkslave01:/hadoopLearning/kafka_2.10-0.8.2.1# bin/kafka-console-producer.sh --broker-list sparkslave01:9092 --sync --topic kafkatopictest
  1. 编写如下程序
import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord}
import org.apache.log4j.{Level, Logger}

import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.{Logging, SparkConf}

object KafkaWordCount {
  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
      System.exit(1)
    }
    StreamingExamples.setStreamingLogLevels()

    val Array(zkQuorum, group, topics, numThreads) = args
    val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[4]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    ssc.checkpoint("checkpoint")

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    //创建ReceiverInputDStream
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L))
      .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

配置运行参数:
这里写图片描述
具体如下:

sparkmaster:2181  test-consumer-group kafkatopictest 1

sparkmaster:2181,zookeeper监听地址
test-consumer-group, consumer-group的名称,必须和$KAFKA_HOME/config/consumer.properties中的group.id的配置内容一致
kafkatopictest,topic名称
1,线程数

运行KafkaWordCount 后,在producer中输入下列内容

root@sparkslave01:/hadoopLearning/kafka_2.10-0.8.2.1# bin/kafka-console-producer.sh --broker-list sparkslave01:9092 --sync --topic kafkatopictest
[2015-11-04 03:25:39,666] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
Spark
Spark TEST
TEST Spark Streaming

这里写图片描述

得到结果如下:
这里写图片描述

2. Spark Streaming与Kafka版本的WordCount示例(二)

前面的例子中,producer是通过kafka的脚本生成的,本例中将给出通过编写程序生成的producer

// 随机生成1-100间的数字
object KafkaWordCountProducer {

  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +
        "<messagesPerSec> <wordsPerMessage>")
      System.exit(1)
    }

    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args

    // Zookeeper连接属性配置
    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    //创建KafkaProducer
    val producer = new KafkaProducer[String, String](props)

    // 向kafka集群发送消息
    while(true) {
      (1 to messagesPerSec.toInt).foreach { messageNum =>
        val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
          .mkString(" ")

        val message = new ProducerRecord[String, String](topic, null, str)
        producer.send(message)
      }

      Thread.sleep(1000)
    }
  }

}

KafkaWordCountProducer 运行参数设置如下:

sparkmaster:9092 kafkatopictest 5 8

sparkmaster:9092,broker-list
kafkatopictest,top名称
5表示每秒发多少条消息
8表示每条消息中有几个单词

先KafkaWordCountProducer,然后再运行KafkaWordCount ,得到的计算结果如下:
这里写图片描述

目录
相关文章
|
1月前
|
消息中间件 存储 分布式计算
Spark学习---6、SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(二)
Spark学习---6、SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(二)
|
1月前
|
消息中间件 分布式计算 Kafka
Spark学习---6、SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(一)
Spark学习---6、SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(一)
|
2月前
|
消息中间件 安全 Java
kafka入门必备知识
Kafka是一个分布式流处理平台: 1. 可以让你发布和订阅流式的记录。这一方面与消息队列或者企业消息系统类似。 2. 可以储存流式的记录,并且有较好的容错性。 3. 可以在流式记录产生时就进行处理。
48 1
|
3月前
|
消息中间件 Kafka Windows
windows搭建kafka入门
windows搭建kafka入门
62 0
|
4月前
|
消息中间件 存储 Kafka
Kafka安装以及入门基本命令操作
Kafka安装以及入门基本命令操作
|
4月前
|
消息中间件 存储 算法
1.5万字长文:从 C# 入门 Kafka
1, 搭建 Kafka 环境 安装 docker-compose 单节点 Kafka 的部署 Kafka 集群的部署 2, Kafka 概念 基本概念 关于 Kafka 脚本工具 主题管理 使用 C# 创建分区 分区与复制 生产者消费者 修改配置 3, Kafka .NET 基础 生产者 批量生产 使用 Tasks.WhenAll 如何进行性能测试 消费 4,生产者 连接 Broker Key 分区 评估消息发送时间 生产者配置 acks bootstrap.servers retries enable.idempotence max.in.flight.requests.per.connec
149 0
|
4月前
|
消息中间件 存储 负载均衡
一篇搞定:消息队列概念、kafka入门、Kafka Golang客户端库
一篇搞定:消息队列概念、kafka入门、Kafka Golang客户端库
213 0
|
5月前
|
消息中间件 分布式计算 负载均衡
Kafka 入门知识,看这一篇就够了(上)
最近在学习 Kafka(别问,问就是公司在用),将学习过程中的笔记整理出来分享给大家,就当是入入门
|
7月前
|
消息中间件 网络协议 安全
【Kafka从入门到成神系列 八】Kafka 多线程消费者及TCP连接
【Kafka从入门到成神系列 八】Kafka 多线程消费者及TCP连接
【Kafka从入门到成神系列 八】Kafka 多线程消费者及TCP连接
|
7月前
|
消息中间件 分布式计算 Dubbo
【Kafka从入门到成神系列 七】Kafka 位移主题
【Kafka从入门到成神系列 七】Kafka 位移主题
【Kafka从入门到成神系列 七】Kafka 位移主题
推荐文章
更多