python3多进程实战(python3经典编程案例)

简介: 该文章提供了Python3中使用多进程的实战案例,展示了如何通过Python的标准库`multiprocessing`来创建和管理进程,以实现并发任务的执行。

进程是操作系统进行资源分配和调度的基本单位,进程之间是通过轮流占用cpu来执行的。

一. 创建进程的类Process

multiprocessing模块提供了一个创建进程的类Process,创建进程有一下两种方法

  • 创建一个Process类的实例,并制定目标任务函数;
  • 自定义一个类并继承Process类,重写其__init__方法和run方法。

1.1 对比单进程和多进程耗时

第一种方法:

from multiprocessing import Process
import os
import time
# 子进程要执行的代码
def task_process(delay):
    num  = 0
    for i in range(delay*100000000):
        num+=i
    print(f"进程pid为 {os.getpid()},执行完成")

if __name__=='__main__':
    print( '父进程pid为 %s.' % os.getpid())
    t0 = time.time()
    task_process(3)
    task_process(3)
    t1 = time.time()
    print(f"顺序执行耗时 {t1-t0} ")
    p0 = Process(target=task_process, args=(3,))
    p1 = Process(target=task_process, args=(3,))
    t2 = time.time()
    p0.start();p1.start()
    p0.join();p1.join()
    t3 = time.time()
    print(f"多进程并发执行耗时 {t3-t2}")

第二种方法: 自定义一个类并继承Process类

from multiprocessing import Process
import os
import time


class MyProcess(Process):
    def __init__(self, delay):
        super().__init__()
        self.delay = delay

    # 子进程要执行的代码
    def run(self):
        num = 0
        #for i in range(self.delay * 100000000):
        for i in range(self.delay * 100000):
            num += i
        print(f"进程pid为 {os.getpid()},执行完成")


if __name__ == "__main__":
    print("父进程pid为 %s." % os.getpid())
    p0 = MyProcess(3)
    p1 = MyProcess(3)
    t0 = time.time()
    print(p0.authkey)
    p0.start()
    p1.start()
    p0.join()
    p1.join()
    t1 = time.time()
    print(f"多进程并发执行耗时 {t1-t0}")

源码:

class BaseProcess(object):
    '''
    Process objects represent activity that is run in a separate process

    The class is analogous to `threading.Thread`
    '''
    def _Popen(self):
        raise NotImplementedError

    def __init__(self, group=None, target=None, name=None, args=(), kwargs={
   },
                 *, daemon=None):
        assert group is None, 'group argument must be None for now'
        count = next(_process_counter)
        self._identity = _current_process._identity + (count,)
        self._config = _current_process._config.copy()
        self._parent_pid = os.getpid()
        self._popen = None
        self._closed = False
        self._target = target
        self._args = tuple(args)
        self._kwargs = dict(kwargs)
        self._name = name or type(self).__name__ + '-' + \
                     ':'.join(str(i) for i in self._identity)
        if daemon is not None:
            self.daemon = daemon
        _dangling.add(self)

参数说明:

  • target: 表示调用对象,一般为函数,也可以为类;
  • args: 表示调用对象的位置参数元组;
  • kwargs: 表示调用对象的字典;
  • name: 进程的别名;
  • group: 参数不使用,可忽略

类提供的常用方法:

  • is_alive(): 返回进程是否是激活的;
  • join(): 可传入超时时间,阻塞进程,知道进程执行完成或超时或进程被终止;
  • run(): 代表进程执行的任务函数,可被重写;
  • start(): 激活进程;
  • terminate(): 终止进程

属性:

  • authkey: 字节码,进程的准秘钥;
  • daemon: 父进程终止后自动终止,且不能产生新的进程,必须在start()之前设置;
  • exitcode: 退出码,进程在运行时为None;
  • name: 获取进程名称;
  • pid: 进程id

1.2 daemon属性对比

from multiprocessing import Process
import os
import time
# 子进程要执行的代码
def task_process(delay):
    print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 子进程执行开始。")
    print(f"sleep {delay}s")
    time.sleep(delay)
    print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 子进程执行结束。")

if __name__=='__main__':
    print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 父进程执行开始。")
    p0 = Process(target=task_process, args=(3,))
    p0.start()
    print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 父进程执行结束。")

设置daemon属性

from multiprocessing import Process
import os
import time
# 子进程要执行的代码
def task_process(delay):
    print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 子进程执行开始。")
    print(f"sleep {delay}s")
    time.sleep(delay)
    print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 子进程执行结束。")

if __name__=='__main__':
    print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 父进程执行开始。")
    p0 = Process(target=task_process, args=(3,))
    #设置 daemon属性为True, 只要主程序运行结束,程序即退出
    p0.daemon = True
    p0.start()
    p0.join()
    print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} 父进程执行结束。")

二. 进程并发控制之Semaphore

用来控制对共享资源的访问数量,可以控制同一时刻并发的进程数。

多进程同步控制:

import multiprocessing
import time

def worker(s, i):
    s.acquire()
    print(time.strftime('%H:%M:%S'),multiprocessing.current_process().name + " 获得锁运行")
    time.sleep(i)
    print(time.strftime('%H:%M:%S'),multiprocessing.current_process().name + " 释放锁结束")
    s.release()

if __name__ == "__main__":
    # 同一时刻只有两个进程在执行操作
    s = multiprocessing.Semaphore(2)
    for i in range(6):
        p = multiprocessing.Process(target = worker, args=(s, 2))
        p.start()

三. 进程同步之lock

如果某一时间只能有一个进程访问某个共享资源,这种情形就需要使用锁

多个进程输出信息,不加锁:

import multiprocessing
import time


def task1():
    n = 5
    while n > 1:
        print(f"{time.strftime('%H:%M:%S')} task1 输出信息")
        time.sleep(1)
        n -= 1


def task2():
    n = 5
    while n > 1:
        print(f"{time.strftime('%H:%M:%S')} task2 输出信息")
        time.sleep(1)
        n -= 1


def task3():
    n = 5
    while n > 1:
        print(f"{time.strftime('%H:%M:%S')} task3 输出信息")
        time.sleep(1)
        n -= 1


if __name__ == "__main__":
    p1 = multiprocessing.Process(target=task1)
    p2 = multiprocessing.Process(target=task2)
    p3 = multiprocessing.Process(target=task3)
    p1.start()
    p2.start()
    p3.start()

上面代码同一时刻有2个进程在打印信息,实际应用中,可能会造成混乱。

现在修改一下程序,同一时刻仅有一个进程输出信息,加锁:

import multiprocessing
import time


def task1(lock):
    # 使用上下文管理器with来写
    with lock:
        n = 5
        while n > 1:
            print(f"{time.strftime('%H:%M:%S')} task1 输出信息")
            time.sleep(1)
            n -= 1


def task2(lock):
    lock.acquire()
    n = 5
    while n > 1:
        print(f"{time.strftime('%H:%M:%S')} task2 输出信息")
        time.sleep(1)
        n -= 1
    lock.release()


def task3(lock):
    lock.acquire()
    n = 5
    while n > 1:
        print(f"{time.strftime('%H:%M:%S')} task3 输出信息")
        time.sleep(1)
        n -= 1
    lock.release()


if __name__ == "__main__":
    lock = multiprocessing.Lock()
    p1 = multiprocessing.Process(target=task1, args=(lock,))
    p2 = multiprocessing.Process(target=task2, args=(lock,))
    p3 = multiprocessing.Process(target=task3, args=(lock,))
    p1.start()
    p2.start()
    p3.start()

四. 进程同步至Event

Event用来实现进程之间的同步通信

下面代码定义了2个进程函数,一个用于等待事件发生,另一个用于等待事件发生并设置超时时间,主进程调用事件的set()方法唤醒等待事件的进程,唤醒后用clear()方法清除事件的状态并重新等待,以此达到进程的同步控制。

import multiprocessing
import time


def wait_for_event(e):
    e.wait()
    time.sleep(1)
    e.clear()
    print(f'{time.strftime("%H:%M%S")} 进程A 等')
    e.wait()
    print(f'{time.strftime("%H:%M%S")} 进程A 一起走')


def wait_for_timeout(e, t):
    e.wait()
    time.sleep(1)
    e.clear()
    print(f'{time.strftime("%H:%M%S")} 进程B 最多等{t}秒')
    e.wait()
    print(f'{time.strftime("%H:%M%S")} 进程B 继续走')


if __name__ == '__main__':
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(target=wait_for_event, args=(e,))
    w2 = multiprocessing.Process(target=wait_for_timeout, args=(e, 3))
    w1.start()
    w2.start()
    print(f'{time.strftime("%H:%M%S")} 主进程 需要5秒')
    e.set()
    time.sleep(8)
    print(f'{time.strftime("%H:%M%S")} 主进程 赶上')
    e.set()
    w1.join()
    w2.join()
    print(f'{time.strftime("%H:%M%S")} 主进程 退出')

五. 进程优先队列Queue

使用多进程实现生产者-消费者模式

from multiprocessing import Process,Queue
import time

def ProducerA(q):
    count = 1
    while True:
        q.put(f"冷饮 {count}")
        print(f"{time.strftime('%H:%M:%S')} A 放入:[冷饮 {count}]")
        count +=1
        time.sleep(1)

def  ConsumerB(q):
    while True:
        print(f"{time.strftime('%H:%M:%S')} B 取出 [{q.get()}]")
        time.sleep(5)
if __name__ == '__main__':
    q = Queue(maxsize=5)
    p = Process(target=ProducerA,args=(q,))
    c = Process(target=ConsumerB,args=(q,))
    c.start()
    p.start()
    c.join()
    p.join()

上面代码定义了生产者和消费者函数,队列最大容量为5,生产者A生产的速度较快,当队列满时,等待消费者B取出后才能继续放入。

六. 多进程之进程池 pool

pool可以提供指定数量的进程供用户调用,池没满可以接受新的请求到池中,池满该请求就会等待,知道池中有进程结束才会创建新的进程。

#coding: utf-8
import multiprocessing
import time

def task(name):
    print(f"{time.strftime('%H:%M:%S')}: {name} 开始执行")
    time.sleep(3)

if __name__ == "__main__":
    # 同一时刻有3个进程在执行
    pool = multiprocessing.Pool(processes = 3)
    for i in range(10):
        #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
        pool.apply_async(func = task, args=(i,))
    pool.close()
    pool.join()
    print("hello")

七. 多进程之数据交换pipe

import multiprocessing
import time


def task1(pipe):
    for i in range(5):
        str = f"task1-{i}"
        print(f"{time.strftime('%H:%M:%S')} task1 发送:{str}")
        pipe.send(str)
    time.sleep(2)
    for i in range(5):
        print(f"{time.strftime('%H:%M:%S')} task1 接收: { pipe.recv() }")


def task2(pipe):
    for i in range(5):
        print(f"{time.strftime('%H:%M:%S')} task2 接收: { pipe.recv() }")
    time.sleep(1)
    for i in range(5):
        str = f"task2-{i}"
        print(f"{time.strftime('%H:%M:%S')} task2 发送:{str}")
        pipe.send(str)


if __name__ == "__main__":
    pipe = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=task1, args=(pipe[0],))
    p2 = multiprocessing.Process(target=task2, args=(pipe[1],))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

程序定义了2个子进程函数: task1先发送5条消息,再接收消息;task2先接收消息,再发送消息。

相关文章
|
15天前
|
人工智能 数据可视化 数据挖掘
探索Python编程:从基础到高级
在这篇文章中,我们将一起深入探索Python编程的世界。无论你是初学者还是有经验的程序员,都可以从中获得新的知识和技能。我们将从Python的基础语法开始,然后逐步过渡到更复杂的主题,如面向对象编程、异常处理和模块使用。最后,我们将通过一些实际的代码示例,来展示如何应用这些知识解决实际问题。让我们一起开启Python编程的旅程吧!
|
14天前
|
存储 数据采集 人工智能
Python编程入门:从零基础到实战应用
本文是一篇面向初学者的Python编程教程,旨在帮助读者从零开始学习Python编程语言。文章首先介绍了Python的基本概念和特点,然后通过一个简单的例子展示了如何编写Python代码。接下来,文章详细介绍了Python的数据类型、变量、运算符、控制结构、函数等基本语法知识。最后,文章通过一个实战项目——制作一个简单的计算器程序,帮助读者巩固所学知识并提高编程技能。
|
2天前
|
Unix Linux 程序员
[oeasy]python053_学编程为什么从hello_world_开始
视频介绍了“Hello World”程序的由来及其在编程中的重要性。从贝尔实验室诞生的Unix系统和C语言说起,讲述了“Hello World”作为经典示例的起源和流传过程。文章还探讨了C语言对其他编程语言的影响,以及它在系统编程中的地位。最后总结了“Hello World”、print、小括号和双引号等编程概念的来源。
97 80
|
1天前
|
分布式计算 大数据 数据处理
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
13 2
|
14天前
|
小程序 开发者 Python
探索Python编程:从基础到实战
本文将引导你走进Python编程的世界,从基础语法开始,逐步深入到实战项目。我们将一起探讨如何在编程中发挥创意,解决问题,并分享一些实用的技巧和心得。无论你是编程新手还是有一定经验的开发者,这篇文章都将为你提供有价值的参考。让我们一起开启Python编程的探索之旅吧!
40 10
|
15天前
|
IDE 程序员 开发工具
Python编程入门:打造你的第一个程序
迈出编程的第一步,就像在未知的海洋中航行。本文是你启航的指南针,带你了解Python这门语言的魅力所在,并手把手教你构建第一个属于自己的程序。从安装环境到编写代码,我们将一步步走过这段旅程。准备好了吗?让我们开始吧!
|
14天前
|
人工智能 数据挖掘 开发者
探索Python编程之美:从基础到进阶
本文是一篇深入浅出的Python编程指南,旨在帮助初学者理解Python编程的核心概念,并引导他们逐步掌握更高级的技术。文章不仅涵盖了Python的基础语法,还深入探讨了面向对象编程、函数式编程等高级主题。通过丰富的代码示例和实践项目,读者将能够巩固所学知识,提升编程技能。无论你是编程新手还是有一定经验的开发者,这篇文章都将为你提供有价值的参考和启示。让我们一起踏上Python编程的美妙旅程吧!
|
5月前
|
运维 关系型数据库 MySQL
掌握taskset:优化你的Linux进程,提升系统性能
在多核处理器成为现代计算标准的今天,运维人员和性能调优人员面临着如何有效利用这些处理能力的挑战。优化进程运行的位置不仅可以提高性能,还能更好地管理和分配系统资源。 其中,taskset命令是一个强大的工具,它允许管理员将进程绑定到特定的CPU核心,减少上下文切换的开销,从而提升整体效率。
掌握taskset:优化你的Linux进程,提升系统性能
|
5月前
|
弹性计算 Linux 区块链
Linux系统CPU异常占用(minerd 、tplink等挖矿进程)
Linux系统CPU异常占用(minerd 、tplink等挖矿进程)
191 4
Linux系统CPU异常占用(minerd 、tplink等挖矿进程)
|
4月前
|
算法 Linux 调度
探索进程调度:Linux内核中的完全公平调度器
【8月更文挑战第2天】在操作系统的心脏——内核中,进程调度算法扮演着至关重要的角色。本文将深入探讨Linux内核中的完全公平调度器(Completely Fair Scheduler, CFS),一个旨在提供公平时间分配给所有进程的调度器。我们将通过代码示例,理解CFS如何管理运行队列、选择下一个运行进程以及如何对实时负载进行响应。文章将揭示CFS的设计哲学,并展示其如何在现代多任务计算环境中实现高效的资源分配。