开发者社区 问答 正文

E-MapReduce Spark + MNS是什么?



Spark + MNS



Spark 接入 MNS


下面这个例子演示了 Spark Streaming 如何消费 MNS 中的数据,统计每个 batch 内的单词个数。

  1. [backcolor=transparent]    val conf [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]new[backcolor=transparent] [backcolor=transparent]SparkConf[backcolor=transparent]().[backcolor=transparent]setAppName[backcolor=transparent]([backcolor=transparent]"Test MNS Streaming"[backcolor=transparent])
  2. [backcolor=transparent]    val batchInterval [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]Seconds[backcolor=transparent]([backcolor=transparent]10[backcolor=transparent])
  3. [backcolor=transparent]    val ssc [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]new[backcolor=transparent] [backcolor=transparent]StreamingContext[backcolor=transparent]([backcolor=transparent]conf[backcolor=transparent],[backcolor=transparent] batchInterval[backcolor=transparent])
  4. [backcolor=transparent]    val queuename [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]"queuename"
  5. [backcolor=transparent]    val accessKeyId [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]"<accessKeyId>"
  6. [backcolor=transparent]    val accessKeySecret [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]"<accessKeySecret>"
  7. [backcolor=transparent]    val endpoint [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]"http://xxx.yyy.zzzz/abc"
  8. [backcolor=transparent]    val mnsStream [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]MnsUtils[backcolor=transparent].[backcolor=transparent]createPullingStreamAsRawBytes[backcolor=transparent]([backcolor=transparent]ssc[backcolor=transparent],[backcolor=transparent] queuename[backcolor=transparent],[backcolor=transparent] accessKeyId[backcolor=transparent],[backcolor=transparent] accessKeySecret[backcolor=transparent],[backcolor=transparent] endpoint[backcolor=transparent],
  9. [backcolor=transparent]      [backcolor=transparent]StorageLevel[backcolor=transparent].[backcolor=transparent]MEMORY_ONLY[backcolor=transparent])
  10. [backcolor=transparent]    mnsStream[backcolor=transparent].[backcolor=transparent]foreachRDD[backcolor=transparent]([backcolor=transparent] rdd [backcolor=transparent]=>[backcolor=transparent] [backcolor=transparent]{
  11. [backcolor=transparent]      rdd[backcolor=transparent].[backcolor=transparent]map[backcolor=transparent]([backcolor=transparent]bytes [backcolor=transparent]=>[backcolor=transparent] [backcolor=transparent]new[backcolor=transparent] [backcolor=transparent]String[backcolor=transparent]([backcolor=transparent]bytes[backcolor=transparent])).[backcolor=transparent]flatMap[backcolor=transparent]([backcolor=transparent]line [backcolor=transparent]=>[backcolor=transparent] line[backcolor=transparent].[backcolor=transparent]split[backcolor=transparent]([backcolor=transparent]" "[backcolor=transparent]))
  12. [backcolor=transparent]        [backcolor=transparent].[backcolor=transparent]map[backcolor=transparent]([backcolor=transparent]word [backcolor=transparent]=>[backcolor=transparent] [backcolor=transparent]([backcolor=transparent]word[backcolor=transparent],[backcolor=transparent] [backcolor=transparent]1[backcolor=transparent]))
  13. [backcolor=transparent]        [backcolor=transparent].[backcolor=transparent]reduceByKey[backcolor=transparent]([backcolor=transparent]_ [backcolor=transparent]+[backcolor=transparent] _[backcolor=transparent]).[backcolor=transparent]collect[backcolor=transparent]().[backcolor=transparent]foreach[backcolor=transparent]([backcolor=transparent]e [backcolor=transparent]=>[backcolor=transparent] println[backcolor=transparent]([backcolor=transparent]s[backcolor=transparent]"word: ${e._1}, cnt: ${e._2}"[backcolor=transparent]))
  14. [backcolor=transparent]    [backcolor=transparent]})
  15. [backcolor=transparent]    ssc[backcolor=transparent].[backcolor=transparent]start[backcolor=transparent]()
  16. [backcolor=transparent]    ssc[backcolor=transparent].[backcolor=transparent]awaitTermination[backcolor=transparent]()


支持MetaService


上面的例子中,我们都是显式地将AK传入到接口中。不过从E-MapReduce SDK 1.3.2版本开始,Spark Streaming可以基于MetaService实现免AK处理MNS数据。具体可以参考E-MapReduce SDK中的MnsUtils类说明:
  1. [backcolor=transparent]MnsUtils[backcolor=transparent].[backcolor=transparent]createPullingStreamAsBytes[backcolor=transparent]([backcolor=transparent]ssc[backcolor=transparent],[backcolor=transparent] queueName[backcolor=transparent],[backcolor=transparent] endpoint[backcolor=transparent],[backcolor=transparent] storageLevel[backcolor=transparent])
  2. [backcolor=transparent]MnsUtils[backcolor=transparent].[backcolor=transparent]createPullingStreamAsRawBytes[backcolor=transparent]([backcolor=transparent]ssc[backcolor=transparent],[backcolor=transparent] queueName[backcolor=transparent],[backcolor=transparent] endpoint[backcolor=transparent],[backcolor=transparent] storageLevel[backcolor=transparent])


附录


完整示例代码请看:

展开
收起
nicenelly 2017-10-27 16:07:36 1883 分享
分享
版权
举报
阿里云 AI 助理回答

这段示例代码展示了如何使用Apache Spark Streaming与阿里云消息服务(MNS)集成,以实现从MNS队列中消费数据并进行处理的功能。下面是对代码的简要解析和相关阿里云产品的说明:

环境配置

  • Spark Streaming:这是Spark的一个组件,用于处理实时数据流。它允许程序能够高吞吐量地处理实时数据。
  • MNS (Message Service):阿里云的消息服务,提供可靠、高性能的消息队列服务,支持多种消息模型,如点对点、发布/订阅等。

代码解析

  1. 初始化配置:首先通过SparkConf设置Spark应用的基本信息,比如应用名。然后创建一个StreamingContext实例,指定批处理的时间间隔(例如,每10秒处理一次数据)。

  2. MNS配置:定义了MNS队列的名称、访问密钥ID、访问密钥秘密以及服务端点。这些是连接到MNS服务所必需的信息。

  3. 创建数据流:使用MnsUtils.createPullingStreamAsRawBytes方法创建一个DStream(离散化流),该方法会从指定的MNS队列中拉取原始字节数据。这里可以选择存储级别,如MEMORY_ONLY表示只在内存中缓存数据。

  4. 处理数据:通过foreachRDD操作,对每个批次的数据进行处理。具体步骤包括:

    • 将接收到的字节数据转换为字符串。
    • 使用flatMap将每行文本分割成单词。
    • 使用map操作为每个单词分配计数值1。
    • 使用reduceByKey按单词聚合计数。
    • 最后,收集并打印每个单词及其出现次数。
  5. 启动与等待终止:调用ssc.start()启动Spark Streaming上下文,并通过awaitTermination等待作业结束。

MetaService支持

  • MetaService是一个用于管理元数据的服务,可以简化AK(Access Key)和SK(Secret Key)的管理。E-MapReduce SDK 1.3.2及以上版本引入了对MetaService的支持,使得在某些场景下可以免去直接传递AK的操作,提高安全性与便捷性。通过MnsUtils类中的特定方法,可以在不直接暴露敏感信息的情况下,利用MetaService自动获取必要的认证信息来访问MNS。

注意事项

  • 确保已正确配置阿里云的访问密钥,并且网络环境允许访问MNS服务端点。
  • 示例代码中的endpoint需要替换为实际的MNS服务地址。
  • 使用MetaService时,需确保E-MapReduce集群或环境已经配置好相应的MetaService访问权限。

参考资源

  • 完整示例代码可以通过阿里云官方文档或GitHub上的E-MapReduce项目找到,这将帮助你更深入地理解和实施类似的集成方案。
有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等