Spark + LogService
Spark 接入 LogService
下面这个例子演示了Spark Streaming如何消费LogService中的日志数据,统计日志条数。
方法一:Receiver Based DStream
- [backcolor=transparent] val logServiceProject [backcolor=transparent]=[backcolor=transparent] args[backcolor=transparent]([backcolor=transparent]0[backcolor=transparent])[backcolor=transparent] [backcolor=transparent]// LogService 中 project 名
- [backcolor=transparent] val logStoreName [backcolor=transparent]=[backcolor=transparent] args[backcolor=transparent]([backcolor=transparent]1[backcolor=transparent])[backcolor=transparent] [backcolor=transparent]// LogService 中 logstore 名
- [backcolor=transparent] val loghubConsumerGroupName [backcolor=transparent]=[backcolor=transparent] args[backcolor=transparent]([backcolor=transparent]2[backcolor=transparent])[backcolor=transparent] [backcolor=transparent]// loghubGroupName 相同的作业将共同消费 logstore 的数据
- [backcolor=transparent] val loghubEndpoint [backcolor=transparent]=[backcolor=transparent] args[backcolor=transparent]([backcolor=transparent]3[backcolor=transparent])[backcolor=transparent] [backcolor=transparent]// 阿里云日志服务数据类 API Endpoint
- [backcolor=transparent] val accessKeyId [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]"<accessKeyId>"[backcolor=transparent] [backcolor=transparent]// 访问日志服务的 AccessKeyId
- [backcolor=transparent] val accessKeySecret [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]"<accessKeySecret>"[backcolor=transparent] [backcolor=transparent]// 访问日志服务的 AccessKeySecret
- [backcolor=transparent] val numReceivers [backcolor=transparent]=[backcolor=transparent] args[backcolor=transparent]([backcolor=transparent]4[backcolor=transparent]).[backcolor=transparent]toInt [backcolor=transparent]// 启动多少个 Receiver 来读取 logstore 中的数据
- [backcolor=transparent] val batchInterval [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]Milliseconds[backcolor=transparent]([backcolor=transparent]args[backcolor=transparent]([backcolor=transparent]5[backcolor=transparent]).[backcolor=transparent]toInt [backcolor=transparent]*[backcolor=transparent] [backcolor=transparent]1000[backcolor=transparent])[backcolor=transparent] [backcolor=transparent]// Spark Streaming 中每次处理批次时间间隔
- [backcolor=transparent] val conf [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]new[backcolor=transparent] [backcolor=transparent]SparkConf[backcolor=transparent]().[backcolor=transparent]setAppName[backcolor=transparent]([backcolor=transparent]"Test Loghub Streaming"[backcolor=transparent])
- [backcolor=transparent] val ssc [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]new[backcolor=transparent] [backcolor=transparent]StreamingContext[backcolor=transparent]([backcolor=transparent]conf[backcolor=transparent],[backcolor=transparent] batchInterval[backcolor=transparent])
- [backcolor=transparent] val loghubStream [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]LoghubUtils[backcolor=transparent].[backcolor=transparent]createStream[backcolor=transparent](
- [backcolor=transparent] ssc[backcolor=transparent],
- [backcolor=transparent] logServiceProject[backcolor=transparent],
- [backcolor=transparent] logStoreName[backcolor=transparent],
- [backcolor=transparent] loghubConsumerGroupName[backcolor=transparent],
- [backcolor=transparent] loghubEndpoint[backcolor=transparent],
- [backcolor=transparent] numReceivers[backcolor=transparent],
- [backcolor=transparent] accessKeyId[backcolor=transparent],
- [backcolor=transparent] accessKeySecret[backcolor=transparent],
- [backcolor=transparent] [backcolor=transparent]StorageLevel[backcolor=transparent].[backcolor=transparent]MEMORY_AND_DISK[backcolor=transparent])
- [backcolor=transparent] loghubStream[backcolor=transparent].[backcolor=transparent]foreachRDD[backcolor=transparent]([backcolor=transparent]rdd [backcolor=transparent]=>[backcolor=transparent] println[backcolor=transparent]([backcolor=transparent]rdd[backcolor=transparent].[backcolor=transparent]count[backcolor=transparent]()))
- [backcolor=transparent] ssc[backcolor=transparent].[backcolor=transparent]start[backcolor=transparent]()
- [backcolor=transparent] ssc[backcolor=transparent].[backcolor=transparent]awaitTermination[backcolor=transparent]()
方法二: Direct API Based DStream
- [backcolor=transparent] val logServiceProject [backcolor=transparent]=[backcolor=transparent] args[backcolor=transparent]([backcolor=transparent]0[backcolor=transparent])
- [backcolor=transparent] val logStoreName [backcolor=transparent]=[backcolor=transparent] args[backcolor=transparent]([backcolor=transparent]1[backcolor=transparent])
- [backcolor=transparent] val loghubConsumerGroupName [backcolor=transparent]=[backcolor=transparent] args[backcolor=transparent]([backcolor=transparent]2[backcolor=transparent])
- [backcolor=transparent] val loghubEndpoint [backcolor=transparent]=[backcolor=transparent] args[backcolor=transparent]([backcolor=transparent]3[backcolor=transparent])
- [backcolor=transparent] val accessKeyId [backcolor=transparent]=[backcolor=transparent] args[backcolor=transparent]([backcolor=transparent]4[backcolor=transparent])
- [backcolor=transparent] val accessKeySecret [backcolor=transparent]=[backcolor=transparent] args[backcolor=transparent]([backcolor=transparent]5[backcolor=transparent])
- [backcolor=transparent] val batchInterval [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]Milliseconds[backcolor=transparent]([backcolor=transparent]args[backcolor=transparent]([backcolor=transparent]6[backcolor=transparent]).[backcolor=transparent]toInt [backcolor=transparent]*[backcolor=transparent] [backcolor=transparent]1000[backcolor=transparent])
- [backcolor=transparent] val zkConnect [backcolor=transparent]=[backcolor=transparent] args[backcolor=transparent]([backcolor=transparent]7[backcolor=transparent])
- [backcolor=transparent] val checkpointPath [backcolor=transparent]=[backcolor=transparent] args[backcolor=transparent]([backcolor=transparent]8[backcolor=transparent])
- [backcolor=transparent] [backcolor=transparent]def[backcolor=transparent] functionToCreateContext[backcolor=transparent]():[backcolor=transparent] [backcolor=transparent]StreamingContext[backcolor=transparent] [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]{
- [backcolor=transparent] val conf [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]new[backcolor=transparent] [backcolor=transparent]SparkConf[backcolor=transparent]().[backcolor=transparent]setAppName[backcolor=transparent]([backcolor=transparent]"Test Direct Loghub Streaming"[backcolor=transparent])
- [backcolor=transparent] val ssc [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]new[backcolor=transparent] [backcolor=transparent]StreamingContext[backcolor=transparent]([backcolor=transparent]conf[backcolor=transparent],[backcolor=transparent] batchInterval[backcolor=transparent])
- [backcolor=transparent] val zkParas [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]Map[backcolor=transparent]([backcolor=transparent]"zookeeper.connect"[backcolor=transparent] [backcolor=transparent]->[backcolor=transparent] zkConnect[backcolor=transparent],[backcolor=transparent] [backcolor=transparent]"enable.auto.commit"[backcolor=transparent] [backcolor=transparent]->[backcolor=transparent] [backcolor=transparent]"false"[backcolor=transparent])
- [backcolor=transparent] val loghubStream [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]LoghubUtils[backcolor=transparent].[backcolor=transparent]createDirectStream[backcolor=transparent](
- [backcolor=transparent] ssc[backcolor=transparent],
- [backcolor=transparent] logServiceProject[backcolor=transparent],
- [backcolor=transparent] logStoreName[backcolor=transparent],
- [backcolor=transparent] loghubConsumerGroupName[backcolor=transparent],
- [backcolor=transparent] accessKeyId[backcolor=transparent],
- [backcolor=transparent] accessKeySecret[backcolor=transparent],
- [backcolor=transparent] loghubEndpoint[backcolor=transparent],
- [backcolor=transparent] zkParas[backcolor=transparent],
- [backcolor=transparent] [backcolor=transparent]LogHubCursorPosition[backcolor=transparent].[backcolor=transparent]END_CURSOR[backcolor=transparent])
- [backcolor=transparent] ssc[backcolor=transparent].[backcolor=transparent]checkpoint[backcolor=transparent]([backcolor=transparent]checkpointPath[backcolor=transparent])
- [backcolor=transparent] val stream [backcolor=transparent]=[backcolor=transparent] loghubStream[backcolor=transparent].[backcolor=transparent]checkpoint[backcolor=transparent]([backcolor=transparent]batchInterval[backcolor=transparent])
- [backcolor=transparent] stream[backcolor=transparent].[backcolor=transparent]foreachRDD[backcolor=transparent]([backcolor=transparent]rdd [backcolor=transparent]=>[backcolor=transparent] [backcolor=transparent]{
- [backcolor=transparent] println[backcolor=transparent]([backcolor=transparent]rdd[backcolor=transparent].[backcolor=transparent]count[backcolor=transparent]())
- [backcolor=transparent] loghubStream[backcolor=transparent].[backcolor=transparent]asInstanceOf[backcolor=transparent][[backcolor=transparent]DirectLoghubInputDStream[backcolor=transparent]].[backcolor=transparent]commitAsync[backcolor=transparent]()
- [backcolor=transparent] [backcolor=transparent]})
- [backcolor=transparent] ssc
- [backcolor=transparent] [backcolor=transparent]}
- [backcolor=transparent] val ssc [backcolor=transparent]=[backcolor=transparent] [backcolor=transparent]StreamingContext[backcolor=transparent].[backcolor=transparent]getOrCreate[backcolor=transparent]([backcolor=transparent]checkpointPath[backcolor=transparent],[backcolor=transparent] functionToCreateContext _[backcolor=transparent])
- [backcolor=transparent] ssc[backcolor=transparent].[backcolor=transparent]start[backcolor=transparent]()
- [backcolor=transparent] ssc[backcolor=transparent].[backcolor=transparent]awaitTermination[backcolor=transparent]()
从E-MapReduce SDK 1.4.0版本开始,提供基于Direct API的实现方式。这种方式可以避免将Loghub数据重复存储到Write Ahead Log中,也即无需开启Spark Streaming的WAL特性即可实现数据的at least once。目前Direct API实现方式处于experimental状态,需要注意的地方有:
- 在DStream的action中,必须做一次commit操作。
- 一个Spark Streaming中,不支持对logstore数据源做多个action操作。
- Direct API方式需要zookeeper服务的支持。
支持MetaService
上面的例子中,我们都是显式地将AK传入到接口中。不过从E-MapReduce SDK 1.3.2版本开始,Spark Streaming可以基于MetaService实现免AK处理LogService数据。具体可以参考E-MapReduce SDK中的LoghubUtils类说明:
- [backcolor=transparent]LoghubUtils[backcolor=transparent].[backcolor=transparent]createStream[backcolor=transparent]([backcolor=transparent]ssc[backcolor=transparent],[backcolor=transparent] logServiceProject[backcolor=transparent],[backcolor=transparent] logStoreName[backcolor=transparent],[backcolor=transparent] loghubConsumerGroupName[backcolor=transparent],[backcolor=transparent] storageLevel[backcolor=transparent])
- [backcolor=transparent]LoghubUtils[backcolor=transparent].[backcolor=transparent]createStream[backcolor=transparent]([backcolor=transparent]ssc[backcolor=transparent],[backcolor=transparent] logServiceProject[backcolor=transparent],[backcolor=transparent] logStoreName[backcolor=transparent],[backcolor=transparent] loghubConsumerGroupName[backcolor=transparent],[backcolor=transparent] numReceivers[backcolor=transparent],[backcolor=transparent] storageLevel[backcolor=transparent])
- [backcolor=transparent]LoghubUtils[backcolor=transparent].[backcolor=transparent]createStream[backcolor=transparent]([backcolor=transparent]ssc[backcolor=transparent],[backcolor=transparent] logServiceProject[backcolor=transparent],[backcolor=transparent] logStoreName[backcolor=transparent],[backcolor=transparent] loghubConsumerGroupName[backcolor=transparent],[backcolor=transparent] storageLevel[backcolor=transparent],[backcolor=transparent] cursorPosition[backcolor=transparent],[backcolor=transparent] mLoghubCursorStartTime[backcolor=transparent],[backcolor=transparent] forceSpecial[backcolor=transparent])
- [backcolor=transparent]LoghubUtils[backcolor=transparent].[backcolor=transparent]createStream[backcolor=transparent]([backcolor=transparent]ssc[backcolor=transparent],[backcolor=transparent] logServiceProject[backcolor=transparent],[backcolor=transparent] logStoreName[backcolor=transparent],[backcolor=transparent] loghubConsumerGroupName[backcolor=transparent],[backcolor=transparent] numReceivers[backcolor=transparent],[backcolor=transparent] storageLevel[backcolor=transparent],[backcolor=transparent] cursorPosition[backcolor=transparent],[backcolor=transparent] mLoghubCursorStartTime[backcolor=transparent],[backcolor=transparent] forceSpecial[backcolor=transparent])
说明
- E-MapReduce SDK支持LogService的三种消费模式,即“BEGIN_CURSOR”,“END_CURSOR”和“SPECIAL_TIMER_CURSOR”,默认是“END_CURSOR”。BEGIN_CURSOR:从日志头开始消费,如果有checkpoint记录,则从checkpoint处开始消费。
- END_CURSOR:从日志尾开始消费,如果有checkpoint记录,则从checkpoint处开始消费。
- SPECIAL_TIMER_CURSOR:从指定时间点开始消费,如果有checkpoint记录,则从checkpoint处开始消费。单位为秒。
- 以上三种消费模式都受到checkpoint记录的影响,如果存在checkpoint记录,则从checkpoint处开始消费,不管指定的是什么消费模式。E-MapReduce SDK基于“SPECIAL_TIMER_CURSOR”模式支持用户强制在指定时间点开始消费:在LoghubUtils#createStream接口中,以下参数需要组合使用:cursorPosition:LogHubCursorPosition.SPECIAL_TIMER_CURSOR
- forceSpecial:true
E-MapReduce 的机器(除了 Master 节点)无法连接公网。配置 LogService endpoint 时,请注意使用 Log Service 提供的内网 endpoint,否则无法请求到 Log Service。更多关于 LogService,请查看
文档。
附录
完整示例代码请看: