开发者社区 问答 正文

E-MapReduce Spark + MNS是什么?



Spark + MNS



Spark 接入 MNS


下面这个例子演示了 Spark Streaming 如何消费 MNS 中的数据,统计每个 batch 内的单词个数。

  1.     val conf = new SparkConf().setAppName("Test MNS Streaming")
  2.     val batchInterval = Seconds(10)
  3.     val ssc = new StreamingContext(conf, batchInterval)
  4.     val queuename = "queuename"
  5.     val accessKeyId = "<accessKeyId>"
  6.     val accessKeySecret = "<accessKeySecret>"
  7.     val endpoint = "http://xxx.yyy.zzzz/abc"
  8.     val mnsStream = MnsUtils.createPullingStreamAsRawBytes(ssc, queuename, accessKeyId, accessKeySecret, endpoint,
  9.       StorageLevel.MEMORY_ONLY)
  10.     mnsStream.foreachRDD( rdd => {
  11.       rdd.map(bytes => new String(bytes)).flatMap(line => line.split(" "))
  12.         .map(word => (word, 1))
  13.         .reduceByKey(_ + _).collect().foreach(e => println(s"word: ${e._1}, cnt: ${e._2}"))
  14.     })
  15.     ssc.start()
  16.     ssc.awaitTermination()


支持MetaService


上面的例子中,我们都是显式地将AK传入到接口中。不过从E-MapReduce SDK 1.3.2版本开始,SparkStreaming可以基于MetaService实现免AK处理MNS数据。具体可以参考E-MapReduce SDK中的MnsUtils类说明:
  1. MnsUtils.createPullingStreamAsBytes(ssc, queueName, endpoint, storageLevel)
  2. MnsUtils.createPullingStreamAsRawBytes(ssc, queueName, endpoint, storageLevel)


附录


完整示例代码请看:

展开
收起
nicenelly 2017-10-30 16:04:00 1700 分享 版权
阿里云 AI 助理回答

在您提供的示例中,展示了如何使用Apache Spark Streaming与阿里云消息服务(MNS)集成,以实现从MNS队列中消费数据并进行处理的功能。下面是对这段代码的解析以及提及的MetaService支持的简要说明:

Spark Streaming接入MNS

  1. 配置Spark环境: 首先,通过SparkConf设置应用名称和批处理时间间隔(例如,每10秒处理一次数据)。这创建了Spark Streaming上下文的基础配置。

  2. 创建MNS数据流: 使用自定义方法MnsUtils.createPullingStreamAsRawBytes来创建一个DStream(离散化流),该DStream直接从指定的MNS队列中拉取原始字节数据。此方法需要提供以下参数:

    • ssc: Spark Streaming上下文。
    • queuename: MNS队列的名称。
    • accessKeyId, accessKeySecret: 访问MNS服务所需的阿里云Access Key ID和Access Key Secret。
    • endpoint: MNS服务的访问地址。
    • storageLevel: RDD的存储级别,这里设置为MEMORY_ONLY,表示数据只存储在内存中。
  3. 处理数据: 对于每个batch interval内接收到的数据,通过一系列转换操作(如map、flatMap、map和reduceByKey)来统计单词出现次数,并打印结果。这是典型的Word Count任务应用于实时数据流。

MetaService支持

  • 免AK处理: 从E-MapReduce SDK 1.3.2版本开始,引入了对MetaService的支持,允许在特定环境下免去显式传递Access Key ID和Access Key Secret的步骤。MetaService通常用于内部自动管理凭证,简化安全性和权限管理。

  • 简化接口: 提供了简化版的MnsUtils.createPullingStreamAsBytesMnsUtils.createPullingStreamAsRawBytes方法,仅需提供队列名、端点和存储级别即可。在这种模式下,SDK会自动从MetaService获取必要的认证信息。

完整示例代码

虽然您提到“完整示例代码请看: Spark接入 MNS”,但未直接提供链接或代码段。根据描述,假设您希望查看或了解更多关于如何实际实施这一过程的代码细节,建议直接参考阿里云官方文档或E-MapReduce SDK的示例代码库。这些资源通常包含详细的步骤说明和完整的示例代码,帮助开发者快速上手并实现类似功能。

总之,通过上述方法,Spark Streaming能够高效地与MNS集成,实现数据的实时处理和分析,而MetaService的集成进一步优化了安全性与便捷性。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答