Spark 实时计算整合案例

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
实时计算 Flink 版,5000CU*H 3个月
简介:

1.概述

  最近有同学问道,除了使用 Storm 充当实时计算的模型外,还有木有其他的方式来实现实时计算的业务。了解到,在使用 Storm 时,需要编写基于编程语言的代码。比如,要实现一个流水指标的统计,需要去编写相应的业务代码,能不能有一种简便的方式来实现这一需求。在解答了该同学的疑惑后,整理了该实现方案的一个案例,供后面的同学学习参考。

2.内容

  实现该方案,整体的流程是不变的,我这里只是替换了其计算模型,将 Storm 替换为 Spark,原先的数据收集,存储依然可以保留。

2.1 Spark Overview

  Spark 出来也是很久了,说起它,应该并不会陌生。它是一个开源的类似于 Hadoop MapReduce 的通用并行计算模型,它拥有 Hadoop MapReduce 所具有的有点,但与其不同的是,MapReduce 的 JOB 中间输出结果可以保存在内存中,不再需要回写磁盘,因而,Spark 能更好的适用于需要迭代的业务场景。

2.2 Flow

  上面只是对 Spark 进行了一个简要的概述,让大家知道其作用,由于本篇博客的主要内容并不是讲述 Spark 的工作原理和计算方法,多的内容,这里笔者就不再赘述,若是大家想详细了解 Spark 的相关内容,可参考官方文档。[参考地址

  接下来,笔者为大家呈现本案例的一个实现流程图,如下图所示:

  通过上图,我们可以看出,首先是采集上报的日志数据,将其存放于消息中间件,这里消息中间件采用的是 Kafka,然后在使用计算模型按照业务指标实现相应的计算内容,最后是将计算后的结果进行持久化,DB 的选择可以多样化,这里笔者就直接使用了 Redis 来作为演示的存储介质,大家所示在使用中,可以替换该存储介质,比如将结果存放到 HDFS,HBase Cluster,或是 MySQL 等都行。这里,我们使用 Spark SQL 来替换掉 Storm 的业务实现编写。

3.实现

  在介绍完上面的内容后,我们接下来就去实现该内容,首先我们要生产数据源,实际的场景下,会有上报好的日志数据,这里,我们就直接写一个模拟数据类,实现代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
object  KafkaIPLoginProducer {
   private  val  uid  =  Array( "123dfe" "234weq" , "213ssf" )
 
   private  val  random  =  new  Random()
 
   private  var  pointer  =  - 1
 
   def  getUserID() :  String  =  {
     pointer  =  pointer +  1
     if  (pointer > =  users.length) {
       pointer  =  0
       uid(pointer)
     else  {
       uid(pointer)
     }
   }
 
   def  plat() :  String  =  {
     random.nextInt( 10 ) +  "10"
   }
 
   def  ip() :  String  =  {
     random.nextInt( 10 ) +  ".12.1.211"
   }
 
   def  country() :  String  =  {
     "中国"  + random.nextInt( 10 )
   }
 
   def  city() :  String  =  {
     "深圳"  + random.nextInt( 10 )
   }
 
   def  location() :  JSONArray  =  {
     JSON.parseArray( "["  + random.nextInt( 10 ) +  ","  + random.nextInt( 10 ) +  "]" )
   }
 
   def  main(args :  Array[String]) :  Unit  =  {
     val  topic  =  "test_data3"
     val  brokers  =  "dn1:9092,dn2:9092,dn3:9092"
     val  props  =  new  Properties()
     props.put( "metadata.broker.list" , brokers)
     props.put( "serializer.class" "kafka.serializer.StringEncoder" )
 
     val  kafkaConfig  =  new  ProducerConfig(props)
     val  producer  =  new  Producer[String, String](kafkaConfig)
 
     while  ( true ) {
       val  event  =  new  JSONObject()
 
       event
         .put( "_plat" "1001" )
         .put( "_uid" "10001" )
         .put( "_tm" , (System.currentTimeMillis /  1000 ).toString())
         .put( "ip" , ip)
         .put( "country" , country)
         .put( "city" , city)
         .put( "location" , JSON.parseArray( "[0,1]" ))
       println( "Message sent: "  + event)
       producer.send( new  KeyedMessage[String, String](topic, event.toString))
       
       event
         .put( "_plat" "1001" )
         .put( "_uid" "10001" )
         .put( "_tm" , (System.currentTimeMillis /  1000 ).toString())
         .put( "ip" , ip)
         .put( "country" , country)
         .put( "city" , city)
         .put( "location" , JSON.parseArray( "[0,1]" ))
       println( "Message sent: "  + event)
       producer.send( new  KeyedMessage[String, String](topic, event.toString))
       
       event
         .put( "_plat" "1001" )
         .put( "_uid" "10002" )
         .put( "_tm" , (System.currentTimeMillis /  1000 ).toString())
         .put( "ip" , ip)
         .put( "country" , country)
         .put( "city" , city)
         .put( "location" , JSON.parseArray( "[0,1]" ))
       println( "Message sent: "  + event)
       producer.send( new  KeyedMessage[String, String](topic, event.toString))
 
       event
         .put( "_plat" "1002" )
         .put( "_uid" "10001" )
         .put( "_tm" , (System.currentTimeMillis /  1000 ).toString())
         .put( "ip" , ip)
         .put( "country" , country)
         .put( "city" , city)
         .put( "location" , JSON.parseArray( "[0,1]" ))
       println( "Message sent: "  + event)
       producer.send( new  KeyedMessage[String, String](topic, event.toString))
       Thread.sleep( 30000 )
     }
   }
}

   上面代码,通过 Thread.sleep() 来控制数据生产的速度。接下来,我们来看看如何实现每个用户在各个区域所分布的情况,它是按照坐标分组,平台和用户ID过滤进行累加次数,逻辑用 SQL 实现较为简单,关键是在实现过程中需要注意的一些问题,比如对象的序列化问题。这里,细节的问题,我们先不讨论,先看下实现的代码,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
object  IPLoginAnalytics {
 
   def  main(args :  Array[String]) :  Unit  =  {
     val  sdf  =  new  SimpleDateFormat( "yyyyMMdd" )
     var  masterUrl  =  "local[2]"
     if  (args.length >  0 ) {
       masterUrl  =  args( 0 )
     }
 
     // Create a StreamingContext with the given master URL
     val  conf  =  new  SparkConf().setMaster(masterUrl).setAppName( "IPLoginCountStat" )
     val  ssc  =  new  StreamingContext(conf, Seconds( 5 ))
 
     // Kafka configurations
     val  topics  =  Set( "test_data3" )
     val  brokers  =  "dn1:9092,dn2:9092,dn3:9092"
     val  kafkaParams  =  Map[String, String](
       "metadata.broker.list"  -> brokers,  "serializer.class"  ->  "kafka.serializer.StringEncoder" )
 
     val  ipLoginHashKey  =  "mf::ip::login::"  + sdf.format( new  Date())
 
     // Create a direct stream
     val  kafkaStream  =  KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
 
     val  events  =  kafkaStream.flatMap(line  = > {
       val  data  =  JSONObject.fromObject(line. _ 2 )
       Some(data)
     })
 
     def  func(iter :  Iterator[(String, String)]) :  Unit  =  {
       while  (iter.hasNext) {
         val  item  =  iter.next()
         println(item. _ 1  ","  + item. _ 2 )
       }
     }
 
     events.foreachRDD { rdd  = >
       // Get the singleton instance of SQLContext
       val  sqlContext  =  SQLContextSingleton.getInstance(rdd.sparkContext)
       import  sqlContext.implicits. _
       // Convert RDD[String] to DataFrame
       val  wordsDataFrame  =  rdd.map(f  = > Record(f.getString( "_plat" ), f.getString( "_uid" ), f.getString( "_tm" ), f.getString( "country" ), f.getString( "location" ))).toDF()
 
       // Register as table
       wordsDataFrame.registerTempTable( "events" )
       // Do word count on table using SQL and print it
       val  wordCountsDataFrame  =  sqlContext.sql( "select location,count(distinct plat,uid) as value from events where from_unixtime(tm,'yyyyMMdd') = '"  + sdf.format( new  Date()) +  "' group by location" )
       var  results  =  wordCountsDataFrame.collect().iterator
 
       /**
        * Internal Redis client for managing Redis connection {@link Jedis} based on {@link RedisPool}
        */
       object  InternalRedisClient  extends  Serializable {
 
         @ transient  private  var  pool :  JedisPool  =  null
 
         def  makePool(redisHost :  String, redisPort :  Int, redisTimeout :  Int,
           maxTotal :  Int, maxIdle :  Int, minIdle :  Int) :  Unit  =  {
           makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle,  true false 10000 )
         }
 
         def  makePool(redisHost :  String, redisPort :  Int, redisTimeout :  Int,
           maxTotal :  Int, maxIdle :  Int, minIdle :  Int, testOnBorrow :  Boolean,
           testOnReturn :  Boolean, maxWaitMillis :  Long) :  Unit  =  {
           if  (pool  ==  null ) {
             val  poolConfig  =  new  GenericObjectPoolConfig()
             poolConfig.setMaxTotal(maxTotal)
             poolConfig.setMaxIdle(maxIdle)
             poolConfig.setMinIdle(minIdle)
             poolConfig.setTestOnBorrow(testOnBorrow)
             poolConfig.setTestOnReturn(testOnReturn)
             poolConfig.setMaxWaitMillis(maxWaitMillis)
             pool  =  new  JedisPool(poolConfig, redisHost, redisPort, redisTimeout)
 
             val  hook  =  new  Thread {
               override  def  run  =  pool.destroy()
             }
             sys.addShutdownHook(hook.run)
           }
         }
 
         def  getPool :  JedisPool  =  {
           assert(pool ! =  null )
           pool
         }
       }
 
       // Redis configurations
       val  maxTotal  =  10
       val  maxIdle  =  10
       val  minIdle  =  1
       val  redisHost  =  "dn1"
       val  redisPort  =  6379
       val  redisTimeout  =  30000
       InternalRedisClient.makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle)
       val  jedis  =  InternalRedisClient.getPool.getResource
       while  (results.hasNext) {
         var  item  =  results.next()
         var  key  =  item.getString( 0 )
         var  value  =  item.getLong( 1 )
         jedis.hincrBy(ipLoginHashKey, key, value)
       }
     }
 
     ssc.start()
     ssc.awaitTermination()
 
   }
}
 
/** Case class for converting RDD to DataFrame */
case  class  Record(plat :  String, uid :  String, tm :  String, country :  String, location :  String)
 
/** Lazily instantiated singleton instance of SQLContext */
object  SQLContextSingleton {
 
   @ transient  private  var  instance :  SQLContext  =  _
 
   def  getInstance(sparkContext :  SparkContext) :  SQLContext  =  {
     if  (instance  ==  null ) {
       instance  =  new  SQLContext(sparkContext)
     }
     instance
   }
}

  我们在开发环境进行测试的时候,使用 local[k] 部署模式,在本地启动 K 个 Worker 线程来进行计算,而这 K 个 Worker 在同一个 JVM 中,上面的示例,默认使用 local[k] 模式。这里我们需要普及一下 Spark 的架构,架构图来自 Spark 的官网,[链接地址]

  这里,不管是在 local[k] 模式,Standalone 模式,还是 Mesos 或是 YARN 模式,整个 Spark Cluster 的结构都可以用改图来阐述,只是各个组件的运行环境略有不同,从而导致他们可能运行在分布式环境,本地环境,亦或是一个 JVM 实利当中。例如,在 local[k] 模式,上图表示在同一节点上的单个进程上的多个组件,而对于 YARN 模式,驱动程序是在 YARN Cluster 之外的节点上提交 Spark 应用,其他组件都是运行在 YARN Cluster 管理的节点上的。

  而对于 Spark Cluster 部署应用后,在进行相关计算的时候会将 RDD 数据集上的函数发送到集群中的 Worker 上的 Executor,然而,这些函数做操作的对象必须是可序列化的。上述代码利用 Scala 的语言特性,解决了这一问题。

4.结果预览

  在完成上述代码后,我们执行代码,看看预览结果如下,执行结果,如下所示:

4.1 启动生产线程

4.2 Redis 结果预览

5.总结

  整体的实现内容不算太复杂,统计的业务指标,这里我们使用 SQL 来完成这部分工作,对比 Storm 来说,我们专注 SQL 的编写就好,难度不算太大。可操作性较为友好。

6.结束语

  这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

联系方式: 
邮箱:smartloli.org@gmail.com 
Twitter: https://twitter.com/smartloli 
QQ群(Hadoop - 交流社区1): 424769183 
温馨提示:请大家加群的时候写上加群理由(姓名+公司/学校),方便管理员审核,谢谢! 

热爱生活,享受编程,与君共勉!



本文转自哥不是小萝莉博客园博客,原文链接:http://www.cnblogs.com/smartloli/,如需转载请自行联系原作者

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
4月前
|
分布式计算 Serverless 调度
EMR Serverless Spark:结合实时计算 Flink 基于 Paimon 实现流批一体
本文演示了使用实时计算 Flink 版和 Serverless Spark 产品快速构建 Paimon 数据湖分析的流程,包括数据入湖 OSS、交互式查询,以及离线Compact。Serverless Spark完全兼容Paimon,通过内置的DLF的元数据实现了和其余云产品如实时计算Flink版的元数据互通,形成了完整的流批一体的解决方案。同时支持灵活的作业运行方式和参数配置,能够满足实时分析、生产调度等多项需求。
60759 107
|
3月前
|
分布式计算 数据处理 流计算
实时计算 Flink版产品使用问题之使用Spark ThriftServer查询同步到Hudi的数据时,如何实时查看数据变化
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
SQL 分布式计算 HIVE
实时计算 Flink版产品使用问题之同步到Hudi的数据是否可以被Hive或Spark直接读取
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
分布式计算 Hadoop 大数据
分布式计算框架比较:Hadoop、Spark 与 Flink
【5月更文挑战第31天】Hadoop是大数据处理的开创性框架,专注于大规模批量数据处理,具有高扩展性和容错性。然而,它在实时任务上表现不足。以下是一个简单的Hadoop MapReduce的WordCount程序示例,展示如何统计文本中单词出现次数。
163 0
|
5月前
|
NoSQL API MongoDB
实时计算 Flink版产品使用合集之断点续传的案例在哪里可以找到
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
传感器 存储 缓存
[尚硅谷flink学习笔记] 实战案例TopN 问题
这段内容是关于如何使用Apache Flink解决实时统计水位传感器数据中,在一定时间窗口内出现次数最多的水位问题,即"Top N"问题。首先,介绍了一个使用滑动窗口的简单实现,通过收集传感器数据,按照水位计数,然后排序并输出前两名。接着,提出了全窗口和优化方案,其中优化包括按键分区(按水位vc分组)、开窗操作(增量聚合计算count)和过程函数处理(聚合并排序输出Top N结果)。最后,给出了一个使用`KeyedProcessFunction`进行优化的示例代码,通过按键by窗口结束时间,确保每个窗口的所有数据到达后再进行处理,提高了效率。
149 1
|
5月前
|
分布式计算 大数据 数据处理
【Flink】Flink跟Spark Streaming的区别?
【4月更文挑战第17天】【Flink】Flink跟Spark Streaming的区别?
|
5月前
|
消息中间件 分布式计算 数据处理
Flink与Spark的区别是什么?请举例说明。
Flink与Spark的区别是什么?请举例说明。
107 0
|
5月前
|
数据可视化 JavaScript 关系型数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化
99 0
|
5月前
|
SQL 消息中间件 关系型数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(四)实时计算需求及技术方案
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(四)实时计算需求及技术方案
147 0
下一篇
无影云桌面