Spark + MNS
Spark 接入 MNS
下面这个例子演示了 Spark Streaming 如何消费 MNS 中的数据,统计每个 batch 内的单词个数。
- [backcolor=transparent] val conf [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]new[backcolor=transparent] [backcolor=transparent]SparkConf[backcolor=transparent]().[backcolor=transparent]setAppName[backcolor=transparent]([backcolor=transparent]"Test MNS Streaming"[backcolor=transparent])
- [backcolor=transparent] val batchInterval [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]Seconds[backcolor=transparent]([backcolor=transparent]10[backcolor=transparent])
- [backcolor=transparent] val ssc [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]new[backcolor=transparent] [backcolor=transparent]StreamingContext[backcolor=transparent]([backcolor=transparent]conf[backcolor=transparent],[backcolor=transparent] batchInterval[backcolor=transparent])
- [backcolor=transparent] val queuename [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]"queuename"
- [backcolor=transparent] val accessKeyId [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]"<accessKeyId>"
- [backcolor=transparent] val accessKeySecret [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]"<accessKeySecret>"
- [backcolor=transparent] val endpoint [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]"http://xxx.yyy.zzzz/abc"
- [backcolor=transparent] val mnsStream [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]MnsUtils[backcolor=transparent].[backcolor=transparent]createPullingStreamAsRawBytes[backcolor=transparent]([backcolor=transparent]ssc[backcolor=transparent],[backcolor=transparent] queuename[backcolor=transparent],[backcolor=transparent] accessKeyId[backcolor=transparent],[backcolor=transparent] accessKeySecret[backcolor=transparent],[backcolor=transparent] endpoint[backcolor=transparent],
- [backcolor=transparent] [backcolor=transparent]StorageLevel[backcolor=transparent].[backcolor=transparent]MEMORY_ONLY[backcolor=transparent])
- [backcolor=transparent] mnsStream[backcolor=transparent].[backcolor=transparent]foreachRDD[backcolor=transparent]([backcolor=transparent] rdd [backcolor=transparent]=>[backcolor=transparent] [backcolor=transparent]{
- [backcolor=transparent] rdd[backcolor=transparent].[backcolor=transparent]map[backcolor=transparent]([backcolor=transparent]bytes [backcolor=transparent]=>[backcolor=transparent] [backcolor=transparent]new[backcolor=transparent] [backcolor=transparent]String[backcolor=transparent]([backcolor=transparent]bytes[backcolor=transparent])).[backcolor=transparent]flatMap[backcolor=transparent]([backcolor=transparent]line [backcolor=transparent]=>[backcolor=transparent] line[backcolor=transparent].[backcolor=transparent]split[backcolor=transparent]([backcolor=transparent]" "[backcolor=transparent]))
- [backcolor=transparent] [backcolor=transparent].[backcolor=transparent]map[backcolor=transparent]([backcolor=transparent]word [backcolor=transparent]=>[backcolor=transparent] [backcolor=transparent]([backcolor=transparent]word[backcolor=transparent],[backcolor=transparent] [backcolor=transparent]1[backcolor=transparent]))
- [backcolor=transparent] [backcolor=transparent].[backcolor=transparent]reduceByKey[backcolor=transparent]([backcolor=transparent]_ [backcolor=transparent]+[backcolor=transparent] _[backcolor=transparent]).[backcolor=transparent]collect[backcolor=transparent]().[backcolor=transparent]foreach[backcolor=transparent]([backcolor=transparent]e [backcolor=transparent]=>[backcolor=transparent] println[backcolor=transparent]([backcolor=transparent]s[backcolor=transparent]"word: ${e._1}, cnt: ${e._2}"[backcolor=transparent]))
- [backcolor=transparent] [backcolor=transparent]})
- [backcolor=transparent] ssc[backcolor=transparent].[backcolor=transparent]start[backcolor=transparent]()
- [backcolor=transparent] ssc[backcolor=transparent].[backcolor=transparent]awaitTermination[backcolor=transparent]()
支持MetaService
上面的例子中,我们都是显式地将AK传入到接口中。不过从E-MapReduce SDK 1.3.2版本开始,Spark Streaming可以基于MetaService实现免AK处理MNS数据。具体可以参考E-MapReduce SDK中的MnsUtils类说明:
- [backcolor=transparent]MnsUtils[backcolor=transparent].[backcolor=transparent]createPullingStreamAsBytes[backcolor=transparent]([backcolor=transparent]ssc[backcolor=transparent],[backcolor=transparent] queueName[backcolor=transparent],[backcolor=transparent] endpoint[backcolor=transparent],[backcolor=transparent] storageLevel[backcolor=transparent])
- [backcolor=transparent]MnsUtils[backcolor=transparent].[backcolor=transparent]createPullingStreamAsRawBytes[backcolor=transparent]([backcolor=transparent]ssc[backcolor=transparent],[backcolor=transparent] queueName[backcolor=transparent],[backcolor=transparent] endpoint[backcolor=transparent],[backcolor=transparent] storageLevel[backcolor=transparent])
附录
完整示例代码请看: