开发者社区> 问答> 正文

E-MapReduce Spark + ONS是什么?



Spark + ONS



Spark 接入 ONS


下面这个例子演示了 Spark Streaming 如何消费 ONS 中的数据,统计每个 batch 内的单词个数。

  1. [backcolor=transparent]    val [backcolor=transparent]Array[backcolor=transparent]([backcolor=transparent]cId[backcolor=transparent],[backcolor=transparent] topic[backcolor=transparent],[backcolor=transparent] subExpression[backcolor=transparent],[backcolor=transparent] parallelism[backcolor=transparent],[backcolor=transparent] interval[backcolor=transparent])[backcolor=transparent] [backcolor=transparent]=[backcolor=transparent] args
  2. [backcolor=transparent]    val accessKeyId [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]"<accessKeyId>"
  3. [backcolor=transparent]    val accessKeySecret [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]"<accessKeySecret>"
  4. [backcolor=transparent]    val numStreams [backcolor=transparent]=[backcolor=transparent] parallelism[backcolor=transparent].[backcolor=transparent]toInt
  5. [backcolor=transparent]    val batchInterval [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]Milliseconds[backcolor=transparent]([backcolor=transparent]interval[backcolor=transparent].[backcolor=transparent]toInt[backcolor=transparent])
  6. [backcolor=transparent]    val conf [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]new[backcolor=transparent] [backcolor=transparent]SparkConf[backcolor=transparent]().[backcolor=transparent]setAppName[backcolor=transparent]([backcolor=transparent]"Test ONS Streaming"[backcolor=transparent])
  7. [backcolor=transparent]    val ssc [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]new[backcolor=transparent] [backcolor=transparent]StreamingContext[backcolor=transparent]([backcolor=transparent]conf[backcolor=transparent],[backcolor=transparent] batchInterval[backcolor=transparent])
  8. [backcolor=transparent]    [backcolor=transparent]def[backcolor=transparent] func[backcolor=transparent]:[backcolor=transparent] [backcolor=transparent]Message[backcolor=transparent] [backcolor=transparent]=>[backcolor=transparent] [backcolor=transparent]Array[backcolor=transparent][[backcolor=transparent]Byte[backcolor=transparent]][backcolor=transparent] [backcolor=transparent]=[backcolor=transparent] msg [backcolor=transparent]=>[backcolor=transparent] msg[backcolor=transparent].[backcolor=transparent]getBody
  9. [backcolor=transparent]    val onsStreams [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]([backcolor=transparent]0[backcolor=transparent] [backcolor=transparent]until[backcolor=transparent] numStreams[backcolor=transparent]).[backcolor=transparent]map [backcolor=transparent]{[backcolor=transparent] i [backcolor=transparent]=>
  10. [backcolor=transparent]      println[backcolor=transparent]([backcolor=transparent]s[backcolor=transparent]"starting stream $i"[backcolor=transparent])
  11. [backcolor=transparent]      [backcolor=transparent]OnsUtils[backcolor=transparent].[backcolor=transparent]createStream[backcolor=transparent](
  12. [backcolor=transparent]        ssc[backcolor=transparent],
  13. [backcolor=transparent]        cId[backcolor=transparent],
  14. [backcolor=transparent]        topic[backcolor=transparent],
  15. [backcolor=transparent]        subExpression[backcolor=transparent],
  16. [backcolor=transparent]        accessKeyId[backcolor=transparent],
  17. [backcolor=transparent]        accessKeySecret[backcolor=transparent],
  18. [backcolor=transparent]        [backcolor=transparent]StorageLevel[backcolor=transparent].[backcolor=transparent]MEMORY_AND_DISK_2[backcolor=transparent],
  19. [backcolor=transparent]        func[backcolor=transparent])
  20. [backcolor=transparent]    [backcolor=transparent]}
  21. [backcolor=transparent]    val unionStreams [backcolor=transparent]=[backcolor=transparent] ssc[backcolor=transparent].[backcolor=transparent]union[backcolor=transparent]([backcolor=transparent]onsStreams[backcolor=transparent])
  22. [backcolor=transparent]    unionStreams[backcolor=transparent].[backcolor=transparent]foreachRDD[backcolor=transparent]([backcolor=transparent]rdd [backcolor=transparent]=>[backcolor=transparent] [backcolor=transparent]{
  23. [backcolor=transparent]      rdd[backcolor=transparent].[backcolor=transparent]map[backcolor=transparent]([backcolor=transparent]bytes [backcolor=transparent]=>[backcolor=transparent] [backcolor=transparent]new[backcolor=transparent] [backcolor=transparent]String[backcolor=transparent]([backcolor=transparent]bytes[backcolor=transparent])).[backcolor=transparent]flatMap[backcolor=transparent]([backcolor=transparent]line [backcolor=transparent]=>[backcolor=transparent] line[backcolor=transparent].[backcolor=transparent]split[backcolor=transparent]([backcolor=transparent]" "[backcolor=transparent]))
  24. [backcolor=transparent]        [backcolor=transparent].[backcolor=transparent]map[backcolor=transparent]([backcolor=transparent]word [backcolor=transparent]=>[backcolor=transparent] [backcolor=transparent]([backcolor=transparent]word[backcolor=transparent],[backcolor=transparent] [backcolor=transparent]1[backcolor=transparent]))
  25. [backcolor=transparent]        [backcolor=transparent].[backcolor=transparent]reduceByKey[backcolor=transparent]([backcolor=transparent]_ [backcolor=transparent]+[backcolor=transparent] _[backcolor=transparent]).[backcolor=transparent]collect[backcolor=transparent]().[backcolor=transparent]foreach[backcolor=transparent]([backcolor=transparent]e [backcolor=transparent]=>[backcolor=transparent] println[backcolor=transparent]([backcolor=transparent]s[backcolor=transparent]"word: ${e._1}, cnt: ${e._2}"[backcolor=transparent]))
  26. [backcolor=transparent]    [backcolor=transparent]})
  27. [backcolor=transparent]    ssc[backcolor=transparent].[backcolor=transparent]start[backcolor=transparent]()
  28. [backcolor=transparent]    ssc[backcolor=transparent].[backcolor=transparent]awaitTermination[backcolor=transparent]()


附录


示例代码请看:

展开
收起
nicenelly 2017-10-27 16:04:57 1646 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
Hybrid Cloud and Apache Spark 立即下载
Scalable Deep Learning on Spark 立即下载
Comparison of Spark SQL with Hive 立即下载