如何使用 Flupy 构建数据处理管道

简介: 如何使用 Flupy 构建数据处理管道

摄影:产品经理厨师:kingname

经常使用 Linux 的同学,肯定对|这个符号不陌生,这个符号是 Linux 的管道符号,可以把左边的数据传递给右边。

例如我有一个spider.log文件,我想查看里面包含"ERROR"关键词,同时时间为2019-11-23的数据,那么我可以这样写命令:

cat spider.log | grep ERROR | grep "2019-11-23"

但是,如果你想执行更复杂的操作,例如提取关键词fail on: https://xxx.com后面的这个网址,然后对所有获得的网址进行去重,那么虽然 shell 命令也能办到,但写起来却稍显麻烦。

这个时候,你就可以使用 Flupy 来实现你的需求。首先我们使用 Python 3.6 以上的版本安装Flupy:

python3 -m pip install flupy

然后开始写代码,看看这几步操作有多简单:

import re
from flupy import flu
with open('spider.log', encoding='utf-8') as f:
    error_url = (flu(f).filter(lambda x: 'ERROR'in x)
                      .map(lambda x: re.search('fail on: (.*?),', x))
                      .filter(lambda x: x isnotNone)
                      .map(lambda x: x.group(1))
                      .unique())
    for url in error_url:
        print(url)

首先flu接收一个可迭代对象,无论是列表还是生成器都可以。然后对里面的每一条数据应用后面的规则。这个过程都是基于生成器实现的,所以不会有内存不足的问题,对于 PB 级别的数据也不在话下。

在上面的例子中,Flupy获取日志文件的每一行内容,首先使用filter进行过滤,只保留包含ERROR字符串的行。然后对这些行通过map方法执行正则表达式,搜索满足fail on: (.*?)\n的内容。由于有些行有,有些行没有,所以这一步返回的数据有些是 None,有些是正则表达式对象,所以进一步再使用filter关键字,把所有返回None的都过滤掉。然后继续使用map关键字,对每一个正则表达式对象获取.group(1)。并把结果输出。

运行效果如下图所示:

实现了数据的提取和去重。并且整个过程通过 Python 实现,代码也比 Shell 简单直观。

由于Flupy可以接收任何可迭代对象,所以传入数据库游标也是没有问题的,例如从 MongoDB 中读取数据并进行处理的一个例子:

import pymongo
from flupy import flu
handler = pymongo.MongoClient().db.col
cursor = handler.find()
data = flu(cursor).filter(lambda x: x['date'] >= '2019-11-10').map(lambda x: x['text']).take_while(lambda x: 'kingname'in x)

这一段代码的意思是说,从数据库中一行一行检查数据,如果date字段大于2019-11-10就获取text字段的数据,满足一条就获取一条,直到某条数据包含kingname为止。

使用Flupy不仅可以通过写.py文件实现,还可以直接在命令行中执行,例如上面读取spider.log的代码,可以转换为终端命令:

flu -f spider.log "_.filter(lambda x: 'ERROR' in x).map(lambda x: re.search('fail on: (.*?),', x)).filter(lambda x: x is not None).map(lambda x: x.group(1)).unique()" -i re

运行效果如下图所示:

通过-i 参数导入不同的库,无论是系统自带的库或者第三方库都可以。

Flupy 的更多使用参数,可以参阅它的官方文档[1]

目录
相关文章
|
存储 算法 数据处理
Python生成器深度解析:构建强大的数据处理管道
Python生成器深度解析:构建强大的数据处理管道
318 0
|
21天前
|
数据采集 存储 消息中间件
构建高效数据管道:从数据采集到分析的实战指南
在数据的海洋中航行,我们需要精准而高效的工具来捕捉、传输和处理信息。本文将引导你穿越技术性文章的迷雾,用简洁明了的语言和代码示例,展现如何打造一个高性能的数据管道。无论你是初学者还是资深开发者,这篇文章都将为你提供宝贵的知识财富。让我们一起解锁数据的力量,探索其背后的奥秘。
44 15
|
22天前
|
数据采集 分布式计算 大数据
构建高效的数据管道:使用Python进行ETL任务
在数据驱动的世界中,高效地处理和移动数据是至关重要的。本文将引导你通过一个实际的Python ETL(提取、转换、加载)项目,从概念到实现。我们将探索如何设计一个灵活且可扩展的数据管道,确保数据的准确性和完整性。无论你是数据工程师、分析师还是任何对数据处理感兴趣的人,这篇文章都将成为你工具箱中的宝贵资源。
|
2月前
|
数据采集 JSON 数据处理
抓取和分析JSON数据:使用Python构建数据处理管道
在大数据时代,电商网站如亚马逊、京东等成为数据采集的重要来源。本文介绍如何使用Python结合代理IP、多线程等技术,高效、隐秘地抓取并处理电商网站的JSON数据。通过爬虫代理服务,模拟真实用户行为,提升抓取效率和稳定性。示例代码展示了如何抓取亚马逊商品信息并进行解析。
抓取和分析JSON数据:使用Python构建数据处理管道
|
2月前
|
前端开发 JavaScript API
Gulp:高效构建流程中的流式处理利器
【10月更文挑战第13天】Gulp:高效构建流程中的流式处理利器
44 0
|
4月前
|
大数据 机器人 数据挖掘
这个云ETL工具配合Python轻松实现大数据集分析,附案例
这个云ETL工具配合Python轻松实现大数据集分析,附案例
|
4月前
|
传感器 PyTorch 数据处理
流式数据处理:DataLoader 在实时数据流中的作用
【8月更文第29天】在许多现代应用中,数据不再是以静态文件的形式存在,而是以持续生成的流形式出现。例如,传感器数据、网络日志、社交媒体更新等都是典型的实时数据流。对于这些动态变化的数据,传统的批处理方式可能无法满足低延迟和高吞吐量的要求。因此,开发能够处理实时数据流的系统变得尤为重要。
202 1
|
4月前
|
SQL 监控 大数据
"解锁实时大数据处理新境界:Google Dataflow——构建高效、可扩展的实时数据管道实践"
【8月更文挑战第10天】随着大数据时代的发展,企业急需高效处理数据以实现即时响应。Google Dataflow作为Google Cloud Platform的强大服务,提供了一个完全托管的流处理与批处理方案。它采用Apache Beam编程模型,支持自动扩展、高可用性,并能与GCP服务无缝集成。例如,电商平台可通过Dataflow实时分析用户行为日志:首先利用Pub/Sub收集数据;接着构建管道处理并分析这些日志;最后将结果输出至BigQuery。Dataflow因此成为构建实时数据处理系统的理想选择,助力企业快速响应业务需求。
258 6
|
4月前
|
机器学习/深度学习 数据采集 数据可视化
构建高效的数据管道:使用Python进行数据处理和分析
【8月更文挑战第24天】在信息爆炸的时代,数据是新的石油。本文将引导你如何利用Python构建一个高效的数据管道,从数据的获取、清洗到分析,最后实现可视化。我们将探索pandas、NumPy、matplotlib等库的强大功能,并通过实际案例加深理解。无论你是数据科学新手还是希望提升数据处理技能的开发者,这篇文章都将为你提供宝贵的洞见和实用技巧。
|
4月前
|
数据采集 监控 大数据
DATEWORES: 构建高效数据管道的最佳实践
【8月更文第14天】随着大数据技术的发展,数据管道已经成为现代数据处理流程的核心部分。本文旨在探讨如何利用DATEWORES——一个假设性的数据管道构建框架——来设计和实现高效的数据管道。我们将介绍DATEWORES的基本概念、架构设计,并通过具体案例演示如何运用该框架进行数据的抽取、转换与加载(ETL)。此外,我们还将讨论如何保证数据质量及数据完整性。
115 0