开发者社区 问答 正文

E-MapReduce Spark + ONS是什么?



Spark + ONS



Spark 接入 ONS


下面这个例子演示了 Spark Streaming 如何消费 ONS 中的数据,统计每个 batch 内的单词个数。

  1.     val Array(cId, topic, subExpression, parallelism, interval) = args
  2.     val accessKeyId = "<accessKeyId>"
  3.     val accessKeySecret = "<accessKeySecret>"
  4.     val numStreams = parallelism.toInt
  5.     val batchInterval = Milliseconds(interval.toInt)
  6.     val conf = new SparkConf().setAppName("Test ONS Streaming")
  7.     val ssc = new StreamingContext(conf, batchInterval)
  8.     def func: Message => Array[Byte] = msg => msg.getBody
  9.     val onsStreams = (0 until numStreams).map { i =>
  10.       println(s"starting stream $i")
  11.       OnsUtils.createStream(
  12.         ssc,
  13.         cId,
  14.         topic,
  15.         subExpression,
  16.         accessKeyId,
  17.         accessKeySecret,
  18.         StorageLevel.MEMORY_AND_DISK_2,
  19.         func)
  20.     }
  21.     val unionStreams = ssc.union(onsStreams)
  22.     unionStreams.foreachRDD(rdd => {
  23.       rdd.map(bytes => new String(bytes)).flatMap(line => line.split(" "))
  24.         .map(word => (word, 1))
  25.         .reduceByKey(_ + _).collect().foreach(e => println(s"word: ${e._1}, cnt: ${e._2}"))
  26.     })
  27.     ssc.start()
  28.     ssc.awaitTermination()


附录


示例代码请看:

展开
收起
nicenelly 2017-10-30 16:00:06 1652 分享 版权
阿里云 AI 助理回答

这段示例代码展示了如何使用Apache Spark Streaming与阿里云的消息服务ONS(Open Notification Service)集成,以实现实时数据处理。下面是对代码关键部分的解析:

  1. 导入必要的参数:

    • args数组包含了从命令行传入的参数,包括ONS消费者ID(cId)、主题名(topic)、订阅表达式(subExpression)、并行度(parallelism)和处理时间间隔(interval)。
    • accessKeyIdaccessKeySecret是访问阿里云ONS服务所需的认证信息,这里需要根据实际情况填写。
  2. Spark配置与Streaming上下文创建:

    • 使用SparkConf设置应用名称,并通过batchInterval定义处理批次的时间间隔。
    • 创建一个StreamingContext实例ssc,它是Spark Streaming的基础,管理数据的接收和处理。
  3. 消息处理函数定义:

    • 定义了一个函数func,用于将接收到的ONS消息体转换为字节数组。
  4. 创建ONS数据流:

    • 通过循环创建多个数据流(基于numStreams),每个数据流都通过调用OnsUtils.createStream方法来初始化,该方法内部会连接到ONS并消费指定主题的数据。
    • 消费者配置了存储级别为MEMORY_AND_DISK_2,意味着数据在内存中保留两份副本,如果内存不足则溢写到磁盘。
  5. 合并数据流并处理数据:

    • 使用ssc.union(onsStreams)将所有数据流合并成一个统一的DStream。
    • 对合并后的DStream应用一系列转换操作:将字节数组转换为字符串,分割单词,统计每个单词出现的次数,最后通过reduceByKey聚合相同单词的计数。
    • 使用foreachRDD对每个处理批次的结果进行打印输出。
  6. 启动Spark Streaming应用:

    • 调用ssc.start()启动流处理作业,然后通过ssc.awaitTermination()等待作业结束或异常终止。

附录提到的“示例代码请看: Spark接入ONS”可能是指向更详细的文档或教程链接,但在这里没有直接给出。要获取完整示例代码及其运行环境要求,建议参考阿里云官方文档或相关开源项目库,确保正确配置依赖项(如ONS客户端库)以及Spark与ONS服务的网络连通性。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答