Java程序员清洗数据的小故事

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 阿里云SLS YYDS,世上无难事,只要肯攀登,办法总比困难多

背景

公司在已有IM项目中开发新业务功能,后端没埋点,团队没有BI,领导提了几个数据指标,统计每天A聊天功能使用人数,B功能功能使用人数,整体功能使用人数,还有就是最离谱的,要统计一次消息发送在服务器内消耗的时间。

了解到该项目线上有8台服务器,服务器日志存储7天,没有权限直接下载日志;接了阿里云SLS,SLS上会存120天的日志,超过天数的会放在阿里云oss 上进行冷备。

跟业务开发的同事明确日志格式以及相关指标数据提取对应的日志特征。关于消息耗时,就针对业务相关的消息,取了一百条消息的相关日志,进行取平均值,实在是不能每条都监控到。


开搞

计划

统计人数的任务

方案1,通过阿里云的sls 编写查询语句进行查询统计,下载少量日志编写代码进行验证sls 数据是否准确;

方案2,通过运维下载相关周期的日志,编写相关清洗代码,进行统计

统计耗时的任务

方案,通过上面查到的发消息人可确定消息内容,基于消息内容检索全部日志,整理贯穿服务的日志记录来计算消息转发耗时。

统计方案1 实践

阿里云sls 功能是采集服务日志,构建相关索引,提供日志聚合,搜索服务。当然其强大的支持sql 进行查询的功能,属实是强,但是也存在一定的学习成本。话不多说,来活儿了,就先想想怎么干。

image.png

既然有sls 这种神器(对我而言确实是),就直接用sls 进行搜索,这样能节省一定采集数据的时间,也不用劳烦运维同学。没有明确要哪一天的日志,我就先写一个能查出数据的语句出来。

那先基于A聊天功能实现一个搜索语句

__tag__:__path__: "/home/logs/im/MsgRcvMoblie.log" and "RCV from\:" and "groupchat" 
| select  
approx_distinct(cast(split(cast(split(content, ';') as array(varchar))[1],'from:') as array(varchar))[2]) AS num 
where content like '%hjbp%'

解释一下上面的语句:

第一行是明确日志文件以及相关日志格式,相当于明确数据源;

第二行的’|‘ 相当于linux 中的管道,这样就可以通过sql 对前面的数据源进行格式化;

第三行使用到了sls 支持的几个函数,具体可以通过阿里云日志服务帮助文档 搜索查看;

第四行使用了支持的like 模糊查询。

以上语句按一天的时间范围执行下来查询结果是不精确的。

image.png

为了能精确一些,想着一天我分三次查询缩小时间范围是可以做到精确查询的,那么这个语句就是可用的了,接下来进行验证逻辑的编写。

直接通过sls 是可以下载少量日志的,而且是csv 格式的,通过Java 让我从心底里抵触,就选择了5年前学了点儿皮毛的python 进行处理。大概的处理跟上面语句很类似,只是数据源比较多,通过pandas 库很快的读取csv 文件中的日志,并且方便处理成文件。

很快啊,验证的结果也是类似的贴一下我low low的代码

# 获取有关发消息各个指标数据
def get_data():
    # 读取日志
    f = open('D:/work/log/downloaded_data.txt',encoding='UTF-8')
    # 设置一个set 存用户id
    hjbp = set()
    for line in f:
        jSon = json.loads(line)
        content = jSon['content']
        # 获取符合标准的数据,有了上面sls 的准备写起来也就轻松了
        if content.__contains__('RCV from') and content.__contains__('groupchat') and  content.__contains__('hjbp'):
            hjbp.add(str.split(str.split(content,";")[0],"from:")[1])
    print("hjbp:", len(hjbp))
#main 函数执行
if __name__ == '__main__':
    get_data()

跟运维要了一天的数据,然后执行对比sls 查到的结果,一比一完美复刻,嗯,阿里云sls yyds。

上面这代码,Java 也是可以实现的啊,怎么就非得用python呢?实际上,我写完sls 的查询语句后,直接实现第二个任务,就是统计耗时的任务了,耗时任务实现中详解。

可天不遂人愿,产品需求是要30天的数据,sls 上述语句,5个指标分别执行,一天一个指标要执行3次,30天,我好崩溃,这5*3*30 查询次数谁顶得住啊,sls 咋这么废物啊。赶紧想解决方案,好在写了验证脚本,方案一其实已经实现了大半的方案二功能。在上述脚本中增加mysql 中的group by 功能即可实现,哈哈哈。不禁佩服起自己的严谨。

# 获取有关发消息各个指标数据
def get_data():
    # 读取日志
    f = open('D:/work/log/downloaded_data.txt',encoding='UTF-8')
    # 设置一个set 存用户id
    hjbp = set()
    # 声明两个list
    date = []
    bplist = []
    t = "2022-03-23"
    for line in f:
        jSon = json.loads(line)
        content = jSon['content']
        nt = time.strftime('%Y-%m-%d', time.localtime(int(jSon['__time__'])))
        if not(t.__eq__(nt)):
            date.append(t)
            bplist.append(len(hjbp))
            t = nt
            hjbp.clear()
        # 获取符合标准的数据,有了上面sls 的准备写起来也就轻松了
        if content.__contains__('RCV from') and content.__contains__('groupchat') and  content.__contains__('hjbp'):
            hjbp.add(str.split(str.split(content,";")[0],"from:")[1])
    # 字典中的key值即为csv中列名 输出到csv
    dataframe = pd.DataFrame({'date': date, 'bpcount': bplist, 'sjcount': sjlist, 'tmcount': tmlist, 'pccount': plist, 'cyjcount': cyjlist})
    dataframe.to_csv("D:/work/log/test.csv", index=False, sep=',')
#main 函数执行
if __name__ == '__main__':
    get_data()

好在第一次跟运维要数据,给的是跨天的24小时数据,还能基于sls 验证一下这个脚本的实用性,完美。

可数据来源又成了问题,一天的日志100多m,30天的日志数据下载,无疑也是对运维工作难度的强迫,谁让咱心善呢,那sls 肯定开放api 吧,我写脚本基于写好的语句进行调用,不禁对自己更加佩服了。

api 调用的权限走工单,开好了发现不能用,运维那边还有事儿,我先准备好调用的代码,可没权限无法验证写的脚本是否能用。巧妇难为无米之炊,实在无奈,情况急转直下,眼看不能如期交付任务,一个比较好的运维弟弟给我带来了曙光。帮我提了一个阿里云的工单,跟工作人员反馈之后,建议点开sql 增强就行了。

赶紧实验,打开下图的sql 增强,点开之后会提示有费用,果然,阿里云就是这么会挣钱。实验下来,确实精准查30天你的数据也是可以的。

image.png

__tag__:__path__: "/home/logs/im/MsgRcvMoblie.log" and "RCV from\:" and "groupchat" 
| select 
  approx_distinct(cast(split(cast(split(content, ';') as array(varchar))[1],'from:') as array(varchar))[2]) AS bpnum ,date_format(__time__,'%Y-%m-%d') as d 
  where content like '%hjbp%' group by d  order by d 

优化最开始的查询语句,加上按天归集的group by 语句,搜索范围改成一个月,非常不错,sls yyds 哈哈哈哈。至此,圆满完成相关指标的统计,这要自己一个劲儿的搜,不得累死啊。

耗时任务

当写完sls 语句的时候,我觉得已经完成了大半的任务,接下来,搞这个耗时的吧,本来想着搜到相关数据之后,在sls 上直接看,肉眼找到接收时间跟发送时间,然后作差。当我搜了10条日志之后,我发现不对劲儿,这tm也很累,一个多小时才搞了这么点儿。

那时候还没想着通过api 进行查询数据,先下载好日志样本,也就是那一百条消息。然后到sls 上搜,基本上能搜到相关的10几条日志,然后下载,下载下来的是csv 格式的数据。用Java 读是不是很头疼吧。我认为是比较麻烦的,就选择了用python 处理。

写脚本,下数据。

# 获取有关发消息之间差值方法
def make_data():
    ave = 0
    for info in os.listdir('D:/downloads/csv'):
        domain = os.path.abspath('D:/downloads/csv')
        fn = os.path.join(domain, info)
        content = pd.read_csv(fn,encoding='ISO-8859-1')
        max = 1
        min = 1
        for i in range(len(content)):
            stime = str(content['content'][i]).split(' ')[0].replace('_',' ')
            if 'nan' == stime:
                continue
            time = dt.datetime.strptime(stime, '%Y-%m-%d %H:%M:%S.%f').timestamp() * 1000
            if(i == 0):
                max = time
                min = time
                continue
            if(max < time):
                max = time
                continue
            if(time < min):
                min = time
                continue
        ave += max - min
    print(ave / 100)

当我把所有日志数据下载下来之后,通过这样一个函数,卡卡卡,就得到了100条日志的耗时一相加一平均,害。齐活儿。

其实上面数据是不准的,希望有机会能在现有的业务中加点儿埋点,记录好消息在服务端的耗时。

总结

世上无难事,只要肯攀登,办法总比困难多;虽然上面代码比较简陋,而且简单。为什么鄙人还好意思拿出来讲呢,实在是这种锲而不舍的精神,解决了一个又一个的难题,让我精神大好,荣光换发,而且也体会到了这种结构性的数据日志,通过代码处理起来是最轻松的。而且也不禁感叹阿里云的sls 功能强大,但是使用起来入门还是有一定的成本的,比如说帮助文档里的那么多函数。希望有一天能出现识别人语言并实现需求的AI,大家都能轻松应对工作。

大家加油!!!

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
4天前
|
存储 安全 Java
"Java编码魔法:揭秘图片与文件的Base64神秘转换术,让数据在指尖跳跃!"
【8月更文挑战第16天】Base64编码在Java开发中常用于将二进制数据如图片转换为ASCII字符串以便传输。编码使用64个字符及等号填充,每3字节数据编码为4个字符。Java利用`java.util.Base64`类实现此功能:读取图片或文件为字节数组后进行编码。解码时将Base64字符串还原为字节数组并写入文件。需注意编码效率降低、不提供安全性及特殊字符兼容性等问题。掌握这些技巧有助于解决Web开发中的数据传输需求。
17 4
|
6天前
|
监控 Java
Java文件夹复制解决方案:优化大文件与大量数据的处理
Java中复制文件夹及其内容,尤其是当处理大文件或文件夹(如几个GB)时,需要特别注意内存使用和性能优化。以下是一个详细的指导,包括如何避免内存溢出异常,并确保复制过程的高效性。
15 1
|
9天前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
28 3
|
15天前
|
存储 安全 Java
揭秘Java序列化神器Serializable:一键解锁对象穿越时空的超能力,你的数据旅行不再受限,震撼登场!
【8月更文挑战第4天】Serializable是Java中的魔术钥匙,开启对象穿越时空的能力。作为序列化的核心,它让复杂对象的复制与传输变得简单。通过实现此接口,对象能被序列化成字节流,实现本地存储或网络传输,再通过反序列化恢复原状。尽管使用方便,但序列化过程耗时且存在安全风险,需谨慎使用。
27 7
|
19天前
|
监控 前端开发 JavaScript
|
28天前
|
Java 程序员 C++
大牛程序员用Java手写JVM:刚好够运行 HelloWorld
大牛程序员用Java手写JVM:刚好够运行 HelloWorld
|
5天前
|
前端开发 Java
如何实现 Java SpringBoot 自动验证入参数据的有效性
如何实现 Java SpringBoot 自动验证入参数据的有效性
14 0
|
5天前
|
存储 缓存 Java
Java本地高性能缓存实践问题之使用@CachePut注解来更新缓存中的数据的问题如何解决
Java本地高性能缓存实践问题之使用@CachePut注解来更新缓存中的数据的问题如何解决
|
6天前
|
设计模式 前端开发 Java
Spring,作为Java程序员的你能想到什么呢?
该文章主要介绍了Spring框架对于Java程序员的意义,包括Spring框架的一些核心能力和为什么它是如此重要。
|
1月前
|
Java 关系型数据库 数据库
实时计算 Flink版操作报错合集之拉取全量数据时,如何解决Checkpoint失败并且报错为 "java.lang.OutOfMemoryError: Java heap space"
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。