Spark-项目中分析日志的核心代码

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 代码LogRecord 类:case class LogRecord ( clientIpAddress: String, rfc1413ClientIdentity: String, remoteUser: S...

代码

LogRecord 类:

case class LogRecord (
    clientIpAddress: String,      
    rfc1413ClientIdentity: String,   
    remoteUser: String,             `
    dateTime: String,              //[day/month/year:hour:minute:second zone]
    request: String,                 
    httpStatusCode: String,          
    bytesSent: String,              
    referer: String,                
    userAgent: String                
)

LogParser 解析类

import java.util.regex.Pattern
import java.text.SimpleDateFormat
import java.util.Locale
import scala.util.control.Exception._
import java.util.regex.Matcher
import scala.util.{Try, Success, Failure}

@SerialVersionUID(100L)
class LogParser extends Serializable {

    private val ddd = "\\d{1,3}"                      
    private val ip = s"($ddd\\.$ddd\\.$ddd\\.$ddd)?"  
    private val client = "(\\S+)"                     
    private val user = "(\\S+)"
    private val dateTime = "(\\[.+?\\])"              
    private val request = "\"(.*?)\""                 
    private val status = "(\\d{3})"
    private val bytes = "(\\S+)"                      
    private val referer = "\"(.*?)\""
    private val agent = "\"(.*?)\""
    private val regex = s"$ip $client $user $dateTime $request $status $bytes $referer $agent"
    private val p = Pattern.compile(regex)


    def parseRecord(record: String): Option[AccessLogRecord] = {
        val matcher = p.matcher(record)
        if (matcher.find) {
            Some(buildAccessLogRecord(matcher))
        } else {
            None
        }
    }

    def parseRecordReturningNullObjectOnFailure(record: String): AccessLogRecord = {
        val matcher = p.matcher(record)
        if (matcher.find) {
            buildAccessLogRecord(matcher)
        } else {
            AccessLogParser.nullObjectAccessLogRecord
        }
    }

    private def buildAccessLogRecord(matcher: Matcher) = {
        AccessLogRecord(
            matcher.group(1),
            matcher.group(2),
            matcher.group(3),
            matcher.group(4),
            matcher.group(5),
            matcher.group(6),
            matcher.group(7),
            matcher.group(8),
            matcher.group(9))
    }
}

/**
 * 例子:
 * 94.102.63.11 - - [21/Jul/2009:02:48:13 -0700] "GET / HTTP/1.1" 200 18209 "http://acme.com/foo.php" "Mozilla/4.0 (compatible; MSIE 5.01; Windows NT 5.0)"
 */
object AccessLogParser {

    val nullObjectAccessLogRecord = AccessLogRecord("", "", "", "", "", "", "", "", "")

    def parseRequestField(request: String): Option[Tuple3[String, String, String]] = {
        val arr = request.split(" ")
        if (arr.size == 3) Some((arr(0), arr(1), arr(2))) else None
    }
    def parseDateField(field: String): Option[java.util.Date] = {
        val dateRegex = "\\[(.*?) .+]"
        val datePattern = Pattern.compile(dateRegex)
        val dateMatcher = datePattern.matcher(field)
        if (dateMatcher.find) {
                val dateString = dateMatcher.group(1)
                println("***** DATE STRING" + dateString)
                // HH is 0-23; kk is 1-24
                val dateFormat = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.ENGLISH)
                allCatch.opt(dateFormat.parse(dateString))  // return Option[Date]
            } else {
            None
        }
    }

}

总结

日志分析是经常做的事情,大数据下的日志分析也是一个常用技术。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
13天前
|
存储 缓存 关系型数据库
MySQL事务日志-Redo Log工作原理分析
事务的隔离性和原子性分别通过锁和事务日志实现,而持久性则依赖于事务日志中的`Redo Log`。在MySQL中,`Redo Log`确保已提交事务的数据能持久保存,即使系统崩溃也能通过重做日志恢复数据。其工作原理是记录数据在内存中的更改,待事务提交时写入磁盘。此外,`Redo Log`采用简单的物理日志格式和高效的顺序IO,确保快速提交。通过不同的落盘策略,可在性能和安全性之间做出权衡。
1576 12
|
13天前
|
存储 缓存 分布式计算
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
37 4
|
13天前
|
存储 消息中间件 大数据
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
20 4
|
14天前
|
SQL 分布式计算 Hadoop
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
36 2
|
13天前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
29 0
|
1月前
|
缓存 监控 算法
分析慢日志文件来优化 PHP 脚本的性能
分析慢日志文件来优化 PHP 脚本的性能
08-06-06>pe_xscan 精简log分析代码 速度提升一倍
08-06-06>pe_xscan 精简log分析代码 速度提升一倍
|
2月前
|
存储 分布式计算 大数据
【Flume的大数据之旅】探索Flume如何成为大数据分析的得力助手,从日志收集到实时处理一网打尽!
【8月更文挑战第24天】Apache Flume是一款高效可靠的数据收集系统,专为Hadoop环境设计。它能在数据产生端与分析/存储端间搭建桥梁,适用于日志收集、数据集成、实时处理及数据备份等多种场景。通过监控不同来源的日志文件并将数据标准化后传输至Hadoop等平台,Flume支持了性能监控、数据分析等多种需求。此外,它还能与Apache Storm或Flink等实时处理框架集成,实现数据的即时分析。下面展示了一个简单的Flume配置示例,说明如何将日志数据导入HDFS进行存储。总之,Flume凭借其灵活性和强大的集成能力,在大数据处理流程中占据了重要地位。
63 3
|
2月前
|
开发框架 .NET Docker
【Azure 应用服务】App Service .NET Core项目在Program.cs中自定义添加的logger.LogInformation,部署到App Service上后日志不显示Log Stream中的问题
【Azure 应用服务】App Service .NET Core项目在Program.cs中自定义添加的logger.LogInformation,部署到App Service上后日志不显示Log Stream中的问题
|
2月前
|
应用服务中间件 Linux nginx
在Linux中,如何统计ip访问情况?分析 nginx 访问日志?如何找出访问页面数量在前十位的ip?
在Linux中,如何统计ip访问情况?分析 nginx 访问日志?如何找出访问页面数量在前十位的ip?