Spark-项目中分析日志的核心代码

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 代码LogRecord 类:case class LogRecord ( clientIpAddress: String, rfc1413ClientIdentity: String, remoteUser: S...

代码

LogRecord 类:

case class LogRecord (
    clientIpAddress: String,      
    rfc1413ClientIdentity: String,   
    remoteUser: String,             `
    dateTime: String,              //[day/month/year:hour:minute:second zone]
    request: String,                 
    httpStatusCode: String,          
    bytesSent: String,              
    referer: String,                
    userAgent: String                
)

LogParser 解析类

import java.util.regex.Pattern
import java.text.SimpleDateFormat
import java.util.Locale
import scala.util.control.Exception._
import java.util.regex.Matcher
import scala.util.{Try, Success, Failure}

@SerialVersionUID(100L)
class LogParser extends Serializable {

    private val ddd = "\\d{1,3}"                      
    private val ip = s"($ddd\\.$ddd\\.$ddd\\.$ddd)?"  
    private val client = "(\\S+)"                     
    private val user = "(\\S+)"
    private val dateTime = "(\\[.+?\\])"              
    private val request = "\"(.*?)\""                 
    private val status = "(\\d{3})"
    private val bytes = "(\\S+)"                      
    private val referer = "\"(.*?)\""
    private val agent = "\"(.*?)\""
    private val regex = s"$ip $client $user $dateTime $request $status $bytes $referer $agent"
    private val p = Pattern.compile(regex)


    def parseRecord(record: String): Option[AccessLogRecord] = {
        val matcher = p.matcher(record)
        if (matcher.find) {
            Some(buildAccessLogRecord(matcher))
        } else {
            None
        }
    }

    def parseRecordReturningNullObjectOnFailure(record: String): AccessLogRecord = {
        val matcher = p.matcher(record)
        if (matcher.find) {
            buildAccessLogRecord(matcher)
        } else {
            AccessLogParser.nullObjectAccessLogRecord
        }
    }

    private def buildAccessLogRecord(matcher: Matcher) = {
        AccessLogRecord(
            matcher.group(1),
            matcher.group(2),
            matcher.group(3),
            matcher.group(4),
            matcher.group(5),
            matcher.group(6),
            matcher.group(7),
            matcher.group(8),
            matcher.group(9))
    }
}

/**
 * 例子:
 * 94.102.63.11 - - [21/Jul/2009:02:48:13 -0700] "GET / HTTP/1.1" 200 18209 "http://acme.com/foo.php" "Mozilla/4.0 (compatible; MSIE 5.01; Windows NT 5.0)"
 */
object AccessLogParser {

    val nullObjectAccessLogRecord = AccessLogRecord("", "", "", "", "", "", "", "", "")

    def parseRequestField(request: String): Option[Tuple3[String, String, String]] = {
        val arr = request.split(" ")
        if (arr.size == 3) Some((arr(0), arr(1), arr(2))) else None
    }
    def parseDateField(field: String): Option[java.util.Date] = {
        val dateRegex = "\\[(.*?) .+]"
        val datePattern = Pattern.compile(dateRegex)
        val dateMatcher = datePattern.matcher(field)
        if (dateMatcher.find) {
                val dateString = dateMatcher.group(1)
                println("***** DATE STRING" + dateString)
                // HH is 0-23; kk is 1-24
                val dateFormat = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.ENGLISH)
                allCatch.opt(dateFormat.parse(dateString))  // return Option[Date]
            } else {
            None
        }
    }

}

总结

日志分析是经常做的事情,大数据下的日志分析也是一个常用技术。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
18天前
|
存储 SQL 监控
|
18天前
|
运维 监控 安全
|
21天前
|
监控 关系型数据库 MySQL
分析慢查询日志
【10月更文挑战第29天】分析慢查询日志
37 3
|
21天前
|
监控 关系型数据库 数据库
怎样分析慢查询日志?
【10月更文挑战第29天】怎样分析慢查询日志?
34 2
|
24天前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
29天前
|
SQL 分布式计算 Serverless
EMR Serverless Spark:一站式全托管湖仓分析利器
本文根据2024云栖大会阿里云 EMR 团队负责人李钰(绝顶) 演讲实录整理而成
125 2
|
24天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
69 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
1月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
63 0
|
1月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
42 0
|
1月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
92 0
下一篇
无影云桌面