利用Spark解析Tomcat日志,并将统计结果存入Mysql数据库

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
云数据库 RDS PostgreSQL,高可用系列 2核4GB
简介: 本文试图实现的需求场景为:以学习Spark知识点为目的,编写Scala利用Spark解析800M的tomcat日志文件,打印一段时间内ERROR级别记录的前10行,统计每分钟的日志记录数,并将统计结果存入mysql数据库中。之前曾用JAVA写过一次同样的处理逻辑,但在学习了Scala之后,真的感觉在计算方面Scala要比JAVA方便的多。没有学习Scala语言的同学速度速度了啊……

本文试图实现的需求场景为:以学习Spark知识点为目的,编写Scala利用Spark解析800M的tomcat日志文件,打印一段时间内ERROR级别记录的前10行,统计每分钟的日志记录数,并将统计结果存入mysql数据库中。之前曾用JAVA写过一次同样的处理逻辑,但在学习了Scala之后,真的感觉在计算方面Scala要比JAVA方便的多。没有学习Scala语言的同学速度速度了啊……

技术要点

  • 将日志文件写入HDFS中,相对路径PATH为“nova.log”
  • 注意JAVA堆栈异常日志的处理
  • 将解析后的异常日志全部存到SparkSQL中或Hive数据仓库中
  • 通过编写SQL查询一段时间内ERROR级别记录的前10行
  • 统计每分钟的日志记录数,并将统计结果存入mysql数据库中,便于上层应用直接使用计算结果

解析前后对比

解析前:

解析后:

解析代码

LoggerApp.scala:

import java.text.SimpleDateFormat
import java.util.Date

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import org.apache.spark.sql.Row
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.TimestampType

/**
 * 日志解析
 */
object LoggerApp {
  def main(args: Array[String]): Unit = {
    println("<!--开始解析-->")
    val reg = "^(\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}.\\d{3}) (\\[.*\\]) (.*) (.*) - ([\\s\\S]*)$"
    val path = "nova.log"
    val sc = new SparkContext(new SparkConf().setAppName("日志解析"))
    val textRDD = sc.textFile(path)

    /**
     * 处理一条日志包括多行的情况
     */
    var key = ""
    val formatRDD = textRDD.map { x =>
      if (x.matches(reg)) {
        key = x
        Pair.apply(key, "")
      } else {
        Pair.apply(key, x)
      }
    }.reduceByKey((a, b) => { a + "\n" + b }).map(x => x._1 + x._2)

    /**
     * 将字符串转换为Logger
     */
    val loggerRDD: RDD[Logger] = formatRDD.map { x =>
      {
        val reg.r(time, thread, level, logger, msg) = x //通过正则取值
        val log = new Logger(formatDate(time), thread, level, logger, msg)
        log
      }
    }.cache()

    /**
     * TODO 通过类的反射机制来定义数据库Scheme,但在scala语言中不知道为啥就是不成功,此处浪费了许久留着以后研究吧
     */
    /*val sqlc = new SQLContext(sc)
    sqlc.createDataFrame(loggerRDD, classOf[Logger]).registerTempTable("logger")*/

    /**
     * 定义数据库Scheme
     */
    val schemaString = "time thread level logger msg"
    val schema =
      StructType(
        schemaString.split(" ").map(fieldName =>
          if ("time".equals(fieldName))
            StructField(fieldName, TimestampType, true)
          else
            StructField(fieldName, StringType, true)))
    /**
     * 将Logger转换为Row
     */
    val rowRDD = loggerRDD.map { log =>
      Row(
        formatDate(log.time),
        log.thread,
        log.level,
        log.logger,
        log.msg)
    }
    /**
     * 利用SQL进行查询过滤
     */
    //    val sqlc = bySQLContext(sc, rowRDD, schema);
    val sqlc = byHiveContext(sc, rowRDD, schema);
    val df = sqlc.sql("select * from logger where level='ERROR' and time between '2016-03-21 11:00:00' and '2016-03-21 12:00:00' order by time")
    val errLogRDD = df.map { x =>
      new Logger(
        formatDate(x.getTimestamp(0)),
        x.getString(1),
        x.getString(2),
        x.getString(3),
        x.getString(4))
    }
    for (log <- errLogRDD.take(10)) {
      println("time:" + formatDateToStr(log.time))
      println("thread:" + log.thread)
      println("level:" + log.level)
      println("logger:" + log.logger)
      println("msg:" + log.msg)
    }
    println("<!--解析结束-->")
  }
  /**
   * 创建临时表
   */
  def bySQLContext(sc: SparkContext, rowRDD: RDD[Row], schema: StructType): SQLContext = {
    val sqlc = new SQLContext(sc)
    sqlc.createDataFrame(rowRDD, schema).registerTempTable("logger")
    sqlc
  }
  /**
   * 创建永久表,需要提前搭建好Spark与Hive的集成环境
   */
  def byHiveContext(sc: SparkContext, rowRDD: RDD[Row], schema: StructType): SQLContext = {
    val sqlc = new HiveContext(sc)
    sqlc.sql("drop table if exists logger")
    sqlc.sql("CREATE TABLE IF NOT EXISTS logger (time TIMESTAMP, thread STRING, level STRING, logger STRING, msg STRING)")
    sqlc.createDataFrame(rowRDD, schema).write.mode("overwrite").saveAsTable("logger")
    sqlc
  }
  def formatDate(str: String): Date = {
    new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").parse(str)
  }
  def formatDate(timestamp: java.sql.Timestamp): Date = {
    new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").parse(timestamp.toString())
  }
  def formatDate(date: Date): java.sql.Timestamp = {
    new java.sql.Timestamp(date.getTime)
  }
  def formatDateToStr(date: Date): String = {
    new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(date)
  }
}

Logger.scala:

import java.util.Date

class Logger extends Serializable {
  var time: Date = null
  var thread: String = ""
  var level: String = ""
  var logger: String = ""
  var msg: String = ""
  def this(time: Date, thread: String, level: String, logger: String, msg: String) {
    this()
    this.time = time;
    this.thread = thread;
    this.level = level;
    this.logger = logger;
    this.msg = msg;
  }
}

统计并写入Mysql

LoggerMysqlApp.scala:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.SQLContext
import java.util.Date
import java.text.SimpleDateFormat
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.Row
import java.util.Properties

object LoggerMysqlApp {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("输出写入Mysql"))
    /**
     * 从hive中加载数据
     */
    val hivec = new HiveContext(sc)
    val df = hivec.sql("select * from logger")
    val loggerRDD = df.rdd.map { x =>
      new Logger(
        LoggerApp.formatDate(x.getTimestamp(0)),
        x.getString(1),
        x.getString(2),
        x.getString(3),
        x.getString(4))
    }
    val resultRDD = loggerRDD.map { logger =>
      Pair(formatDateToStr(logger.time), 1)
    }.reduceByKey((a, b) =>
      { a + b }).map(f =>
      Row(f._1, f._2)).sortBy(f => f.getInt(1), false, 2)
    for (r <- resultRDD.take(10)) {
      println(r.getString(0) + ":" + r.getInt(1))
    }
    /**
     * 定义数据库Scheme
     */
    val schemaString = "time count"
    val schema =
      StructType(
        schemaString.split(" ").map(fieldName =>
          if ("time".equals(fieldName))
            StructField(fieldName, StringType, true)
          else
            StructField(fieldName, IntegerType, true)))
    /**
     * TODO计算每分钟日志的个数
     */
    val connectionProperties = new Properties()
    connectionProperties.setProperty("user", "root")
    connectionProperties.setProperty("password", ".")
    new SQLContext(sc).createDataFrame(resultRDD, schema).write.jdbc(
      "jdbc:mysql://192.168.136.128:3306/logger",
      "logger",
      connectionProperties);
  }
  def formatDateToStr(date: Date): String = {
    new SimpleDateFormat("yyyy-MM-dd HH:mm").format(date)
  }
}


目录
相关文章
|
6月前
|
存储 Java 文件存储
微服务——SpringBoot使用归纳——Spring Boot使用slf4j进行日志记录—— logback.xml 配置文件解析
本文解析了 `logback.xml` 配置文件的详细内容,包括日志输出格式、存储路径、控制台输出及日志级别等关键配置。通过定义 `LOG_PATTERN` 和 `FILE_PATH`,设置日志格式与存储路径;利用 `&lt;appender&gt;` 节点配置控制台和文件输出,支持日志滚动策略(如文件大小限制和保存时长);最后通过 `&lt;logger&gt;` 和 `&lt;root&gt;` 定义日志级别与输出方式。此配置适用于精细化管理日志输出,满足不同场景需求。
1514 1
|
6月前
|
存储 缓存 监控
【YashanDB数据库】数据库运行正常,日志出现大量错误metadata changed
数据库运行正常,日志出现大量错误metadata changed
|
9月前
|
SQL 关系型数据库 MySQL
深入解析MySQL的EXPLAIN:指标详解与索引优化
MySQL 中的 `EXPLAIN` 语句用于分析和优化 SQL 查询,帮助你了解查询优化器的执行计划。本文详细介绍了 `EXPLAIN` 输出的各项指标,如 `id`、`select_type`、`table`、`type`、`key` 等,并提供了如何利用这些指标优化索引结构和 SQL 语句的具体方法。通过实战案例,展示了如何通过创建合适索引和调整查询语句来提升查询性能。
1792 10
|
5月前
|
SQL 运维 关系型数据库
MySQL Binlog 日志查看方法及查看内容解析
本文介绍了 MySQL 的 Binlog(二进制日志)功能及其使用方法。Binlog 记录了数据库的所有数据变更操作,如 INSERT、UPDATE 和 DELETE,对数据恢复、主从复制和审计至关重要。文章详细说明了如何开启 Binlog 功能、查看当前日志文件及内容,并解析了常见的事件类型,包括 Format_desc、Query、Table_map、Write_rows、Update_rows 和 Delete_rows 等,帮助用户掌握数据库变化历史,提升维护和排障能力。
|
10月前
|
存储 运维 监控
API明细日志及运维统计日志全面提升API可运维性
在数字化转型的大潮中,数据已成为企业最宝贵的资产之一。而数据服务API可快速为数据应用提供数据接口。面对越来越多的API以及越来越多的应用调用,如何快速查看API的服务情况、异常情况及影响范围,以及查看API的调用详情,进行API的性能优化、错误排查变得越来越重要,本文将介绍如何配置和开通API运维统计及明细日志,以及如何查看日志进行介绍。
519 0
|
6月前
|
存储 监控 算法
基于 PHP 语言的滑动窗口频率统计算法在公司局域网监控电脑日志分析中的应用研究
在当代企业网络架构中,公司局域网监控电脑系统需实时处理海量终端设备产生的连接日志。每台设备平均每分钟生成 3 至 5 条网络请求记录,这对监控系统的数据处理能力提出了极高要求。传统关系型数据库在应对这种高频写入场景时,性能往往难以令人满意。故而,引入特定的内存数据结构与优化算法成为必然选择。
134 3
|
6月前
|
监控 Java 应用服务中间件
Tomcat log日志解析
理解和解析Tomcat日志文件对于诊断和解决Web应用中的问题至关重要。通过分析 `catalina.out`、`localhost.log`、`localhost_access_log.*.txt`、`manager.log`和 `host-manager.log`等日志文件,可以快速定位和解决问题,确保Tomcat服务器的稳定运行。掌握这些日志解析技巧,可以显著提高运维和开发效率。
488 13
|
6月前
|
监控 Shell Linux
Android调试终极指南:ADB安装+多设备连接+ANR日志抓取全流程解析,覆盖环境变量配置/多设备调试/ANR日志分析全流程,附Win/Mac/Linux三平台解决方案
ADB(Android Debug Bridge)是安卓开发中的重要工具,用于连接电脑与安卓设备,实现文件传输、应用管理、日志抓取等功能。本文介绍了 ADB 的基本概念、安装配置及常用命令。包括:1) 基本命令如 `adb version` 和 `adb devices`;2) 权限操作如 `adb root` 和 `adb shell`;3) APK 操作如安装、卸载应用;4) 文件传输如 `adb push` 和 `adb pull`;5) 日志记录如 `adb logcat`;6) 系统信息获取如屏幕截图和录屏。通过这些功能,用户可高效调试和管理安卓设备。
|
6月前
|
人工智能 运维 监控
一招高效解析 Access Log,轻松应对泼天流量
一招高效解析 Access Log,轻松应对泼天流量
106 0
一招高效解析 Access Log,轻松应对泼天流量
|
7月前
|
XML JSON Java
Java中Log级别和解析
日志级别定义了日志信息的重要程度,从低到高依次为:TRACE(详细调试)、DEBUG(开发调试)、INFO(一般信息)、WARN(潜在问题)、ERROR(错误信息)和FATAL(严重错误)。开发人员可根据需要设置不同的日志级别,以控制日志输出量,避免影响性能或干扰问题排查。日志框架如Log4j 2由Logger、Appender和Layout组成,通过配置文件指定日志级别、输出目标和格式。

推荐镜像

更多
  • DNS