Flume+Kafka+Spark Streaming+MySQL实时日志分析

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS PostgreSQL,高可用系列 2核4GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 网络发展迅速的时代,越来越多人通过网络获取跟多的信息或通过网络作一番自己的事业,当投身于搭建属于自己的网站、APP或小程序时会发现,经过一段时间经营和维护发现浏览量和用户数量的增长速度始终没有提升。在对其进行设计改造时无从下手,当在不了解用户的浏览喜欢和个用户群体的喜好。虽然服务器日志中明确的记载了用户访浏览的喜好但是通过普通方式很难从大量的日志中及时有效的筛选出优质信息。Spark Streaming是一个实时的流计算框架,该技术可以对数据进行实时快速的分析,通过与Flume、Kafka的结合能够做到近乎零延迟的数据统计分析。

项目背景


网络发展迅速的时代,越来越多人通过网络获取跟多的信息或通过网络作一番自己的事业,当投身于搭建属于自己的网站、APP或小程序时会发现,经过一段时间经营和维护发现浏览量和用户数量的增长速度始终没有提升。在对其进行设计改造时无从下手,当在不了解用户的浏览喜欢和个用户群体的喜好。虽然服务器日志中明确的记载了用户访浏览的喜好但是通过普通方式很难从大量的日志中及时有效的筛选出优质信息。Spark Streaming是一个实时的流计算框架,该技术可以对数据进行实时快速的分析,通过与Flume、Kafka的结合能够做到近乎零延迟的数据统计分析。


案例需求

要求:实时分析服务器日志数据,并实时计算出某时间段内的浏览量等信息。


使用技术:Flume-》Kafka-》SparkStreaming-》MySql数据库


#案例架构

image.png


架构中通过Flume实时监控日志文件,当日志文件中出现新数据时将该条数据发送给Kafka,并有Spark Streaming接收进行实时的数据分析最后将分析结果保存到MySQL数据库中。


一、分析

1、日志分析

1.通过浏览器访问服务器中的网页,每访问一次就会产生一条日志信息。日志中包含访问者IP、访问时间、访问地址、状态码和耗时等信息,如下图所示:

image.png

二、日志采集

第一步、代码编辑

通过使用Flume实时监控服务器日志文件内容,每生成一条都会进行采集,并将采集的结构发送给Kafka,Flume代码如下。

image.png

2、启动采集代码

代码编辑完成后启动Flume对服务器日志信息进行监控,进入Flume安装目录执行如下代码。

image.png

[root@master flume]# bin/flume-ng agent --name a1 --conf conf  --conf-file conf/access_log-HDFS.properties  -Dflume.root.logger=INFO,console


三、编写Spark Streaming的代码

第一步 创建工程

image.png

第二步 选择创建Scala工程

image.png

第三步 设置工程名与工程所在路径和使用的Scala版本后完成创建

image.png


第四步 创建scala文件

项目目录的”src”处单机鼠标右键依次选择”New”->”Package”创建一个包名为”com.wordcountdemo”,并在该包处单机右键依次选择”New”->”scala class”创建文件命名为wordcount

image.png

第五步:导入依赖包

在IDEA中导入Spark依赖包,在菜单中依次选择”File”->”Project Structure”->”Libraries”后单击”+”号按钮选择”Java”选项,在弹出的对话框中找到spark-assembly-1.6.1-hadoop2.6.0.jar依赖包点击”OK”将所有依赖包加载到工程中,结果如图X所示。

image.png

第六步:引入本程序所需要的全部方法

注意此处使用了三个spark2中没有的jar包分别为kafka_2.11-0.8.2.1.jar、

metrics-core-2.2.0.jar、spark-streaming-kafka_2.11-1.6.3.jar。

image.png

import java.sql.DriverManager                       //连接数据库
import kafka.serializer.StringDecoder                  //序列化数据
import org.apache.spark.streaming.dstream.DStream      //接收输入数据流
import org.apache.spark.streaming.kafka.KafkaUtils      //连接Kafka 
import org.apache.spark.streaming.{Seconds, StreamingContext}  //实时流处理
import org.apache.spark.SparkConf                      //spark程序的入口函数


第七步:创建main函数与Spark程序入口。

def main(args: Array[String]): Unit = {
  //创建sparksession
  val conf = new SparkConf().setAppName("Consumer")
  val ssc = new StreamingContext(conf,Seconds(20))  //设置每隔20秒接收并计算一次
}

image.png

第八步:设置kafka服务的主机地址和端口号,并设置从哪个topic接收数据和设置消费者组

//kafka服务器地址
val kafkaParam = Map("metadata.broker.list" -> "192.168.10.10:9092")
//设置topic
val topic = "testSpark".split(",").toSet
//接收kafka数据
val logDStream: DStream[String] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParam,topic).map(_._2)

第九步:数分析

接收到数据后,对数据进行分析,将服务器日志数据按照空格进行拆分,并分别统计出阶段时间内的网站浏览量、用户注册数量和用户的跳出率并将统计结果转换为键值对类型的RDD。


//拆分接收到的数据
    val RDDIP =logDStream.transform(rdd=>rdd.map(x=>x.split(" ")))
    //进行数据分析
    val pv = RDDIP.map(x=>x(0)).count().map(x=>("pv",x))   //用户浏览量
    val jumper = RDDIP.map(x=>x(0)).map((_,1)).reduceByKey(_+_).filter(x=>x._2 == 1).map(x=>x._1).count.map(x=>("jumper",x))   //跳出率
    val reguser =RDDIP.filter(_(8).replaceAll("\"","").toString == "/member.php?mod=register&inajax=1").count.map(x=>("reguser",x))  //注册用户数量

第十步:保存计算结果

遍历统计结果RDD取出键值对中的值并分别分别将分析结果保存到pvtab、jumpertab和regusetab表中,最后启动Spark Streaming程序。


pv.foreachRDD(line =>line.foreachPartition(rdd=>{
      rdd.foreach(word=>{
        val conn = DriverManager.getConnection("jdbc:mysql://master:3306/test", "root", "123456")
        val format = new java.text.SimpleDateFormat("yyyy-MM-dd H:mm:ss")
        val dateFf= format.format(new java.util.Date())
        val sql = "insert into pvtab(time,pv) values("+"'"+dateFf+"'," +"'"+word._2+"')"
        conn.prepareStatement(sql).executeUpdate()
      })
      }))
    jumper.foreachRDD(line =>line.foreachPartition(rdd=>{
      rdd.foreach(word=>{
        val conn = DriverManager.getConnection("jdbc:mysql://master:3306/test", "root", "123456")
        val format = new java.text.SimpleDateFormat("yyyy-MM-dd H:mm:ss")
        val dateFf= format.format(new java.util.Date())
        val sql = "insert into jumpertab(time,jumper) values("+"'"+dateFf+"'," +"'"+word._2+"')"
        conn.prepareStatement(sql).executeUpdate()
    })
    }))
    reguser.foreachRDD(line =>line.foreachPartition(rdd=>{
      rdd.foreach(word=>{
        val conn = DriverManager.getConnection("jdbc:mysql://master:3306/test", "root", "123456")
        val format = new java.text.SimpleDateFormat("yyyy-MM-dd H:mm:ss")
        val dateFf= format.format(new java.util.Date())
        val sql = "insert into regusetab(time,reguse) values("+"'"+dateFf+"'," +"'"+word._2+"')"
        conn.prepareStatement(sql).executeUpdate()
     })
    }))
    ssc.start()        //启动Spark Streaming程序


第十一步 数据库设计

创建一个数据库名为“test”,并在该库中创建三个表分别名为"jumpertab"、“pvtab”、“regusetab”,数据库结构如下图所示


jumpertab表

image.png


pvtab表

image.png


regusetab表

image.png


四、编译运行

将程序编辑为jar包提交到集群中运行。

image.png

第一步、将工程添加到jar文件并设置文件名称

选择“File”-“Project Structure”命令,在弹出的对话框中选择“Artifacts”按钮,选择“+”下的“JAR”->“Empty”在随后弹出的对话框中“NAME”处设置JAR文件的名字为“WordCount”,并双击右侧“firstSpark”下的“’firstSpark’compile output”将其加载到左侧,表示已经将工程添加到JAR包中然后点击“OK”按钮

image.png

第二步、生成jar包

点击菜单栏中的“Build”->“Build Artifacts…”按钮在弹出的对话框中单击“Build”按钮,jar包生成后工程根目录会自动创建一个out目录在目录中可以看到生成的jar包,


第三步、提交运行Spark Streaming程序

[root@master bin]# ./spark-submit --master local[*] --class  com.spark.streaming.sparkword /usr/local/Streaminglog.jar 
1

image.png

第四步:查看数据库

image.png

完整代码如下

package spark
import java.sql.DriverManager
import java.util.Calendar
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkConf
object kafkaspark {
  def main(args: Array[String]): Unit = {
    //    创建sparksession
    val conf = new SparkConf().setAppName("Consumer")
    val ssc = new StreamingContext(conf,Seconds(1))
    val kafkaParam = Map("metadata.broker.list" -> "192.168.10.10:9092")
    val topic = "testSpark".split(",").toSet
    //接收kafka数据
    val logDStream: DStream[String] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParam,topic).map(_._2)
    //拆分接收到的数据
    val RDDIP =logDStream.transform(rdd=>rdd.map(x=>x.split(" ")))
    //进行数据分析
    val pv = RDDIP.map(x=>x(0)).count().map(x=>("pv",x))
    val jumper = RDDIP.map(x=>x(0)).map((_,1)).reduceByKey(_+_).filter(x=>x._2 == 1).map(x=>x._1).count.map(x=>("jumper",x))
    val reguser =RDDIP.filter(_(8).replaceAll("\"","").toString == "/member.php?mod=register&inajax=1").count.map(x=>("reguser",x))
    //将分析结果保存到MySQL数据库
      pv.foreachRDD(line =>line.foreachPartition(rdd=>{
          rdd.foreach(word=>{
            val conn = DriverManager.getConnection("jdbc:mysql://master:3306/test", "root", "123456")
            val format = new java.text.SimpleDateFormat("H:mm:ss")
            val dateFf= format.format(new java.util.Date())
            var cal:Calendar=Calendar.getInstance()
            cal.add(Calendar.SECOND,-1)
            var Beforeasecond=format.format(cal.getTime())
            val date = Beforeasecond.toString+"-"+dateFf.toString
            val sql = "insert into pvtab(time,pv) values("+"'"+date+"'," +"'"+word._2+"')"
            conn.prepareStatement(sql).executeUpdate()
          })
          }))
    jumper.foreachRDD(line =>line.foreachPartition(rdd=>{
      rdd.foreach(word=>{
        val conn = DriverManager.getConnection("jdbc:mysql://master:3306/test", "root", "123456")
        val format = new java.text.SimpleDateFormat("H:mm:ss")
        val dateFf= format.format(new java.util.Date())
        var cal:Calendar=Calendar.getInstance()
        cal.add(Calendar.SECOND,-1)
        var Beforeasecond=format.format(cal.getTime())
        val date = Beforeasecond.toString+"-"+dateFf.toString
        val sql = "insert into jumpertab(time,jumper) values("+"'"+date+"'," +"'"+word._2+"')"
        conn.prepareStatement(sql).executeUpdate()
      })
    }))
    reguser.foreachRDD(line =>line.foreachPartition(rdd=>{
      rdd.foreach(word=>{
        val conn = DriverManager.getConnection("jdbc:mysql://master:3306/test", "root", "123456")
        val format = new java.text.SimpleDateFormat("H:mm:ss")
        val dateFf= format.format(new java.util.Date())
        var cal:Calendar=Calendar.getInstance()
        cal.add(Calendar.SECOND,-1)
        var Beforeasecond=format.format(cal.getTime())
        val date = Beforeasecond.toString+"-"+dateFf.toString
        val sql = "insert into regusetab(time,reguse) values("+"'"+date+"'," +"'"+word._2+"')"
        conn.prepareStatement(sql).executeUpdate()
      })
    }))
    val num = logDStream.map(x=>(x,1)).reduceByKey(_+_)
    num.print()
    //启动Streaming
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }
}
Flume+Kafka+Spark Streaming+MySQL实时数据处理_u014552259的博客-CSDN博客_flume kafka spark


目录
相关文章
|
2月前
|
SQL 运维 关系型数据库
深入探讨MySQL的二进制日志(binlog)选项
总结而言,对MySQL binlogs深度理解并妥善配置对数据库运维管理至关重要;它不仅关系到系统性能优化也是实现高可靠性架构设计必须考虑因素之一。通过精心规划与周密部署可以使得该机能充分发挥作用而避免潜在风险带来影响。
126 6
|
2月前
|
监控 安全 搜索推荐
使用EventLog Analyzer进行日志取证分析
EventLog Analyzer助力企业通过集中采集、归档与分析系统日志及syslog,快速构建“数字犯罪现场”,精准追溯安全事件根源。其强大搜索功能可秒级定位入侵时间、人员与路径,生成合规与取证报表,确保日志安全防篡改,大幅提升调查效率,为执法提供有力证据支持。
144 0
|
4月前
|
监控 安全 NoSQL
【DevOps】Logstash详解:高效日志管理与分析工具
Logstash是ELK Stack核心组件之一,具备强大的日志收集、处理与转发能力。它支持多种数据来源,提供灵活的过滤、转换机制,并可通过插件扩展功能,广泛应用于系统日志分析、性能优化及安全合规等领域,是现代日志管理的关键工具。
742 0
|
6月前
|
SQL 监控 关系型数据库
MySQL日志分析:binlog、redolog、undolog三大日志的深度探讨。
数据库管理其实和写小说一样,需要规划,需要修订,也需要有能力回滚。理解这些日志的作用与优化,就像把握写作工具的使用与运用,为我们的数据库保驾护航。
291 23
|
6月前
|
自然语言处理 监控 安全
阿里云发布可观测MCP!支持自然语言查询和分析多模态日志
阿里云可观测官方发布了Observable MCP Server,提供了一系列访问阿里云可观测各产品的工具能力,包含阿里云日志服务SLS、阿里云应用实时监控服务ARMS等,支持用户通过自然语言形式查询
843 0
阿里云发布可观测MCP!支持自然语言查询和分析多模态日志
|
5月前
|
人工智能 运维 监控
Aipy实战:分析apache2日志中的网站攻击痕迹
Apache2日志系统灵活且信息全面,但安全分析、实时分析和合规性审计存在较高技术门槛。为降低难度,可借助AI工具如aipy高效分析日志,快速发现攻击痕迹并提供反制措施。通过结合AI与学习技术知识,新手运维人员能更轻松掌握复杂日志分析任务,提升工作效率与技能水平。
|
10月前
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
469 1
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
324 1
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
1168 9