作者:周志湖
主要内容
- Spark Streaming与Kafka版的WordCount示例(一)
- Spark Streaming与Kafka版的WordCount示例(二)
1. Spark Streaming与Kafka版本的WordCount示例 (一)
- 启动kafka集群
root@sparkslave02:/hadoopLearning/kafka_2.10-0.8.2.1# bin/kafka-server-start.sh config/server.properties
root@sparkslave01:/hadoopLearning/kafka_2.10-0.8.2.1# bin/kafka-server-start.sh config/server.properties
root@sparkmaster:/hadoopLearning/kafka_2.10-0.8.2.1# bin/kafka-server-start.sh config/server.properties
向kafka集群发送消息
root@sparkslave01:/hadoopLearning/kafka_2.10-0.8.2.1# bin/kafka-console-producer.sh --broker-list sparkslave01:9092 --sync --topic kafkatopictest
- 编写如下程序
import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.{Logging, SparkConf}
object KafkaWordCount {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[4]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
//创建ReceiverInputDStream
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
配置运行参数:
具体如下:
sparkmaster:2181 test-consumer-group kafkatopictest 1
sparkmaster:2181,zookeeper监听地址
test-consumer-group, consumer-group的名称,必须和$KAFKA_HOME/config/consumer.properties中的group.id的配置内容一致
kafkatopictest,topic名称
1,线程数
运行KafkaWordCount 后,在producer中输入下列内容
root@sparkslave01:/hadoopLearning/kafka_2.10-0.8.2.1# bin/kafka-console-producer.sh --broker-list sparkslave01:9092 --sync --topic kafkatopictest
[2015-11-04 03:25:39,666] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
Spark
Spark TEST
TEST Spark Streaming
得到结果如下:
2. Spark Streaming与Kafka版本的WordCount示例(二)
前面的例子中,producer是通过kafka的脚本生成的,本例中将给出通过编写程序生成的producer
// 随机生成1-100间的数字
object KafkaWordCountProducer {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +
"<messagesPerSec> <wordsPerMessage>")
System.exit(1)
}
val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
// Zookeeper连接属性配置
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
//创建KafkaProducer
val producer = new KafkaProducer[String, String](props)
// 向kafka集群发送消息
while(true) {
(1 to messagesPerSec.toInt).foreach { messageNum =>
val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
.mkString(" ")
val message = new ProducerRecord[String, String](topic, null, str)
producer.send(message)
}
Thread.sleep(1000)
}
}
}
KafkaWordCountProducer 运行参数设置如下:
sparkmaster:9092 kafkatopictest 5 8
sparkmaster:9092,broker-list
kafkatopictest,top名称
5表示每秒发多少条消息
8表示每条消息中有几个单词
先KafkaWordCountProducer,然后再运行KafkaWordCount ,得到的计算结果如下: