Python 中整洁的并行输出

简介: Python 中整洁的并行输出

Python 并行输出

使用进程和锁并行输出多个任务的状态。

注:以下代码在linux下可用,windows下可能要进行修改。

假设你有一个程序,它对列表进行一些处理:

def log(repo_name, *args):
    print(f"{repo_name}:", *args)

def randsleep():
    import random
    import time
    time.sleep(random.randint(1, 5))

def func(repo_name):
    log(repo_name, "Starting")
    randsleep()  # Can be substituted for actual work
    log(repo_name, "Installing")
    randsleep()
    log(repo_name, "Building")
    randsleep()
    log(repo_name, "Instrumenting")
    randsleep()
    log(repo_name, "Running tests")
    randsleep()
    log(repo_name, f"Result in {repo_name}.json")

repos = ["repoA", "repoB", "repoC", "repoD"]
for repo in repos:
    func(repo)

这很好。它有效。有点吵,但有效。但随后你发现了一件好事:你的程序是数据并行。也就是说,您可以并行处理:

import multiprocessing

# ...

with multiprocessing.Pool() as pool:
    pool.map(func, repos, chunksize=1)

不幸的是,输出有点笨拙。虽然每行仍然很好输出一个 repo,但它正在左右喷出行,并且这些行是混合的。

幸运的是,StackOverflow 用户 Leedehai是终端专业用户,知道如何在控制台中一次重写多行。我们可以根据自己的需要调整这个答案:

def fill_output():
    to_fill = num_lines - len(last_output_per_process)
    for _ in range(to_fill):
        print()

def clean_up():
    for _ in range(num_lines):
        print("\x1b[1A\x1b[2K", end="")  # move up cursor and delete whole line

def log(repo_name, *args):
    with terminal_lock:
        last_output_per_process[repo_name] = " ".join(str(arg) for arg in args)
        clean_up()
        sorted_lines = last_output_per_process.items()
        for repo_name, last_line in sorted_lines:
            print(f"{repo_name}: {last_line}")
        fill_output()

def func(repo_name):
    # ...
    with terminal_lock:
        del last_output_per_process[repo_name]

# ...

repos = ["repoA", "repoB", "repoC", "repoD"]
num_procs = multiprocessing.cpu_count()
num_lines = min(len(repos), num_procs)
with multiprocessing.Manager() as manager:
    last_output_per_process = manager.dict()
    terminal_lock = manager.Lock()
    fill_output()
    with multiprocessing.Pool() as pool:
        pool.map(func, repos, chunksize=1)
    clean_up()

这会将每个项目的状态(一次一行)打印到终端。它将按项目添加到的 last_output_per_process 顺序打印,但您可以通过(例如)按字母数字排序来更改它: sorted(last_output_per_process.items())

请注意,我们必须锁定数据结构和终端输出,以避免事情被破坏;它们在过程之间共享(pickled,via Manager )。


如果日志输出有多行长,或者其他人正在用 stdout / stderr (也许是流浪的 print )搞砸,我不确定这会做什么。如果您发现或有整洁的解决方案,请写信。


这种技术对于任何具有线程和锁的编程语言来说可能是相当可移植的。关键的区别在于这些实现应该使用线程而不是进程;我做进程是因为它是 Python。

最终版

import multiprocessing
import random
import time


class Logger:
    def __init__(self, num_lines, last_output_per_process, terminal_lock):
        self.num_lines = num_lines
        self.last_output_per_process = last_output_per_process
        self.terminal_lock = terminal_lock

    def fill_output(self):
        to_fill = self.num_lines - len(self.last_output_per_process)
        for _ in range(to_fill):
            print()

    def clean_up(self):
        for _ in range(self.num_lines):
            print("\x1b[1A\x1b[2K", end="")  # move up cursor and delete whole line

    def log(self, repo_name, *args):
        with self.terminal_lock:
            self.last_output_per_process[repo_name] = " ".join(str(arg) for arg in args)
            self.clean_up()
            sorted_lines = self.last_output_per_process.items()
            for repo_name, last_line in sorted_lines:
                print(f"{repo_name}: {last_line}")
            self.fill_output()

    def done(self, repo_name):
        with self.terminal_lock:
            del self.last_output_per_process[repo_name]


class MultiprocessingLogger(Logger):
    def __init__(self, num_lines, manager):
        super().__init__(num_lines, manager.dict(), manager.Lock())


class FakeLock:
    def __enter__(self):
        pass

    def __exit__(self, exc_type, exc_value, traceback):
        pass


class SingleProcessLogger(Logger):
    def __init__(self, num_lines):
        super().__init__(num_lines, {}, FakeLock())


def randsleep():
    time.sleep(random.randint(1, 2) / random.randint(1, 5))


def func(repo_name):
    logger.log(repo_name, "Starting")
    randsleep()
    logger.log(repo_name, "Installing")
    randsleep()
    logger.log(repo_name, "Building")
    randsleep()
    logger.log(repo_name, "Instrumenting")
    randsleep()
    logger.log(repo_name, "Running tests")
    randsleep()
    logger.log(repo_name, f"Result in {repo_name}.json")
    randsleep()
    logger.done(repo_name)


def multi_process_demo():
    ascii_uppercase = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
    repos = [f"repo{letter}" for letter in ascii_uppercase]
    num_procs = multiprocessing.cpu_count()
    num_lines = min(len(repos), num_procs)
    with multiprocessing.Manager() as manager:
        global logger
        logger = MultiprocessingLogger(num_lines, manager)
        # Make space for our output
        logger.fill_output()
        with multiprocessing.Pool(num_procs) as pool:
            pool.map(func, repos, chunksize=1)
        logger.clean_up()


def single_process_demo():
    repo = "repoA"
    num_lines = 1
    global logger
    logger = SingleProcessLogger(num_lines)
    logger.fill_output()
    func(repo)
    logger.clean_up()

if __name__ == "__main__":
    multi_process_demo()
    # single_process_demo()
相关文章
|
7月前
|
Python
解释Python中的并发编程和并行编程之间的区别。
解释Python中的并发编程和并行编程之间的区别。
51 0
|
5月前
|
Python
解锁Python并发新世界:线程与进程的并行艺术,让你的应用性能翻倍!
【7月更文挑战第9天】并发编程**是同时执行多个任务的技术,提升程序效率。Python的**threading**模块支持多线程,适合IO密集型任务,但受GIL限制。**multiprocessing**模块允许多进程并行,绕过GIL,适用于CPU密集型任务。例如,计算平方和,多线程版本使用`threading`分割工作并同步结果;多进程版本利用`multiprocessing.Pool`分块计算再合并。正确选择能优化应用性能。
38 1
|
2月前
|
并行计算 安全 Java
Python 多线程并行执行详解
Python 多线程并行执行详解
74 3
|
3月前
|
Python
Python中的zip:高效处理并行迭代的利器
Python中的zip:高效处理并行迭代的利器
30 0
|
5月前
|
并行计算 Python
python 并发与并行
【7月更文挑战第21天】
45 5
python 并发与并行
|
4月前
|
机器学习/深度学习 并行计算 算法
Python中最简单易用的并行加速技巧
Python中最简单易用的并行加速技巧
|
5月前
|
SQL 并行计算 API
Dask是一个用于并行计算的Python库,它提供了类似于Pandas和NumPy的API,但能够在大型数据集上进行并行计算。
Dask是一个用于并行计算的Python库,它提供了类似于Pandas和NumPy的API,但能够在大型数据集上进行并行计算。
|
4月前
|
并行计算 大数据 Java
高效数据处理:使用Python实现并行计算的技巧
传统的数据处理方式在面对大数据时可能效率不高,本文探讨如何利用Python中的并行计算技术来提升数据处理速度和效率,重点介绍了多线程和多进程的应用,以及如何选择合适的场景使用这些技术。
|
6月前
|
开发框架 并行计算 安全
Python的GIL限制了CPython在多核下的并行计算,但通过替代解释器(如Jython, IronPython, PyPy)和多进程、异步IO可规避
【6月更文挑战第26天】Python的GIL限制了CPython在多核下的并行计算,但通过替代解释器(如Jython, IronPython, PyPy)和多进程、异步IO可规避。Numba、Cython等工具编译优化代码,未来社区可能探索更高级的并发解决方案。尽管GIL仍存在,现有策略已能有效提升并发性能。
72 3