开发者社区> 问答> 正文

E-MapReduce Spark + Log Service是什么?



Spark + LogService



Spark 接入 LogService


下面这个例子演示了Spark Streaming如何消费LogService中的日志数据,统计日志条数。

方法一:Receiver Based DStream

  1.     val logServiceProject = args(0)    // LogService 中 project 名
  2.     val logStoreName = args(1)     // LogService 中 logstore 名
  3.     val loghubConsumerGroupName = args(2)  // loghubGroupName 相同的作业将共同消费 logstore 的数据
  4.     val loghubEndpoint = args(3)  // 阿里云日志服务数据类 API Endpoint
  5.     val accessKeyId = "<accessKeyId>"     // 访问日志服务的 AccessKeyId
  6.     val accessKeySecret = "<accessKeySecret>" // 访问日志服务的 AccessKeySecret
  7.     val numReceivers = args(4).toInt  // 启动多少个 Receiver 来读取 logstore 中的数据
  8.     val batchInterval = Milliseconds(args(5).toInt * 1000) // Spark Streaming 中每次处理批次时间间隔
  9.     val conf = new SparkConf().setAppName("Test Loghub Streaming")
  10.     val ssc = new StreamingContext(conf, batchInterval)
  11.     val loghubStream = LoghubUtils.createStream(
  12.       ssc,
  13.       logServiceProject,
  14.       logStoreName,
  15.       loghubConsumerGroupName,
  16.       loghubEndpoint,
  17.       numReceivers,
  18.       accessKeyId,
  19.       accessKeySecret,
  20.       StorageLevel.MEMORY_AND_DISK)
  21.     loghubStream.foreachRDD(rdd => println(rdd.count()))
  22.     ssc.start()
  23.     ssc.awaitTermination()


方法二: Direct API BasedDStream

  1.     val logServiceProject = args(0)
  2.     val logStoreName = args(1)
  3.     val loghubConsumerGroupName = args(2)
  4.     val loghubEndpoint = args(3)
  5.     val accessKeyId = args(4)
  6.     val accessKeySecret = args(5)
  7.     val batchInterval = Milliseconds(args(6).toInt * 1000)
  8.     val zkConnect = args(7)
  9.     val checkpointPath = args(8)
  10.     def functionToCreateContext(): StreamingContext = {
  11.       val conf = new SparkConf().setAppName("Test Direct Loghub Streaming")
  12.       val ssc = new StreamingContext(conf, batchInterval)
  13.       val zkParas = Map("zookeeper.connect" -> zkConnect, "enable.auto.commit" -> "false")
  14.       val loghubStream = LoghubUtils.createDirectStream(
  15.         ssc,
  16.         logServiceProject,
  17.         logStoreName,
  18.         loghubConsumerGroupName,
  19.         accessKeyId,
  20.         accessKeySecret,
  21.         loghubEndpoint,
  22.         zkParas,
  23.         LogHubCursorPosition.END_CURSOR)
  24.       ssc.checkpoint(checkpointPath)
  25.       val stream = loghubStream.checkpoint(batchInterval)
  26.       stream.foreachRDD(rdd => {
  27.         println(rdd.count())
  28.         loghubStream.asInstanceOf[DirectLoghubInputDStream].commitAsync()
  29.       })
  30.       ssc
  31.     }
  32.     val ssc = StreamingContext.getOrCreate(checkpointPath, functionToCreateContext _)
  33.     ssc.start()
  34.     ssc.awaitTermination()

从E-MapReduce SDK 1.4.0版本开始,提供基于Direct API的实现方式。这种方式可以避免将Loghub数据重复存储到WriteAhead Log中,也即无需开启Spark Streaming的WAL特性即可实现数据的at least once。目前DirectAPI实现方式处于experimental状态,需要注意的地方有:
  • 在DStream的action中,必须做一次commit操作。
  • 一个Spark Streaming中,不支持对logstore数据源做多个action操作。
  • Direct API方式需要zookeeper服务的支持。


支持MetaService


上面的例子中,我们都是显式地将AK传入到接口中。不过从E-MapReduce SDK 1.3.2版本开始,SparkStreaming可以基于MetaService实现免AK处理LogService数据。具体可以参考E-MapReduceSDK中的LoghubUtils类说明:
  1. LoghubUtils.createStream(ssc, logServiceProject, logStoreName, loghubConsumerGroupName, storageLevel)
  2. LoghubUtils.createStream(ssc, logServiceProject, logStoreName, loghubConsumerGroupName, numReceivers, storageLevel)
  3. LoghubUtils.createStream(ssc, logServiceProject, logStoreName, loghubConsumerGroupName, storageLevel, cursorPosition, mLoghubCursorStartTime, forceSpecial)
  4. LoghubUtils.createStream(ssc, logServiceProject, logStoreName, loghubConsumerGroupName, numReceivers, storageLevel, cursorPosition, mLoghubCursorStartTime, forceSpecial)


说明

  • E-MapReduceSDK支持LogService的三种消费模式,即“BEGIN_CURSOR”,“END_CURSOR”和“SPECIAL_TIMER_CURSOR”,默认是“END_CURSOR”。BEGIN_CURSOR:从日志头开始消费,如果有checkpoint记录,则从checkpoint处开始消费。
  • END_CURSOR:从日志尾开始消费,如果有checkpoint记录,则从checkpoint处开始消费。
  • SPECIAL_TIMER_CURSOR:从指定时间点开始消费,如果有checkpoint记录,则从checkpoint处开始消费。单位为秒。
  • 以上三种消费模式都受到checkpoint记录的影响,如果存在checkpoint记录,则从checkpoint处开始消费,不管指定的是什么消费模式。E-MapReduceSDK基于“SPECIAL_TIMER_CURSOR”模式支持用户强制在指定时间点开始消费:在LoghubUtils#createStream接口中,以下参数需要组合使用:cursorPosition:LogHubCursorPosition.SPECIAL_TIMER_CURSOR
  • forceSpecial:true
E-MapReduce 的机器(除了 Master 节点)无法连接公网。配置 LogService endpoint 时,请注意使用 LogService 提供的内网 endpoint,否则无法请求到 Log Service。更多关于 LogService,请查看 文档

附录


完整示例代码请看:

展开
收起
nicenelly 2017-10-30 16:03:21 1794 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
Hybrid Cloud and Apache Spark 立即下载
Scalable Deep Learning on Spark 立即下载
Comparison of Spark SQL with Hive 立即下载