开发者社区> 问答> 正文

创建数据处理管道

你想以数据管道(类似Unix管道)的方式迭代处理数据。 比如,你有个大量的数据需要处理,但是不能将它们一次性放入内存中。

展开
收起
哦哦喔 2020-04-16 21:53:43 992 0
1 条回答
写回答
取消 提交回答
  • 生成器函数是一个实现管道机制的好办法。 为了演示,假定你要处理一个非常大的日志文件目录:
    
    foo/
        access-log-012007.gz
        access-log-022007.gz
        access-log-032007.gz
        ...
        access-log-012008
    bar/
        access-log-092007.bz2
        ...
        access-log-022008
    假设每个日志文件包含这样的数据:
    
    124.115.6.12 - - [10/Jul/2012:00:18:50 -0500] "GET /robots.txt ..." 200 71
    210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /ply/ ..." 200 11875
    210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /favicon.ico ..." 404 369
    61.135.216.105 - - [10/Jul/2012:00:20:04 -0500] "GET /blog/atom.xml ..." 304 -
    ...
    为了处理这些文件,你可以定义一个由多个执行特定任务独立任务的简单生成器函数组成的容器。就像这样:
    
    import os
    import fnmatch
    import gzip
    import bz2
    import re
    
    def gen_find(filepat, top):
        '''
        Find all filenames in a directory tree that match a shell wildcard pattern
        '''
        for path, dirlist, filelist in os.walk(top):
            for name in fnmatch.filter(filelist, filepat):
                yield os.path.join(path,name)
    
    def gen_opener(filenames):
        '''
        Open a sequence of filenames one at a time producing a file object.
        The file is closed immediately when proceeding to the next iteration.
        '''
        for filename in filenames:
            if filename.endswith('.gz'):
                f = gzip.open(filename, 'rt')
            elif filename.endswith('.bz2'):
                f = bz2.open(filename, 'rt')
            else:
                f = open(filename, 'rt')
            yield f
            f.close()
    
    def gen_concatenate(iterators):
        '''
        Chain a sequence of iterators together into a single sequence.
        '''
        for it in iterators:
            yield from it
    
    def gen_grep(pattern, lines):
        '''
        Look for a regex pattern in a sequence of lines
        '''
        pat = re.compile(pattern)
        for line in lines:
            if pat.search(line):
                yield line
    现在你可以很容易的将这些函数连起来创建一个处理管道。 比如,为了查找包含单词python的所有日志行,你可以这样做:
    
    lognames = gen_find('access-log*', 'www')
    files = gen_opener(lognames)
    lines = gen_concatenate(files)
    pylines = gen_grep('(?i)python', lines)
    for line in pylines:
        print(line)
    如果将来的时候你想扩展管道,你甚至可以在生成器表达式中包装数据。 比如,下面这个版本计算出传输的字节数并计算其总和。
    
    lognames = gen_find('access-log*', 'www')
    files = gen_opener(lognames)
    lines = gen_concatenate(files)
    pylines = gen_grep('(?i)python', lines)
    bytecolumn = (line.rsplit(None,1)[1] for line in pylines)
    bytes = (int(x) for x in bytecolumn if x != '-')
    print('Total', sum(bytes))
    
    2020-04-16 21:53:54
    赞同 展开评论 打赏
问答分类:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
凡普实时数据处理 立即下载
效率提升:表格存储实时数据流:Stream的技术揭秘和应用场景 立即下载
凡普实时数据处理架构 立即下载