[雪峰磁针石博客]python库介绍-multiprocessing:多进程

简介: 简介 进程是运行的程序,每个进程有自己的系统状态,包含了内存、打开文件列表、程序计数器(跟踪执行的指令)、存储函数本地调用变量的堆栈。 使用os或subprocess可以创建新进程,比如:os.fork(), subprocess.Popen()。

简介

进程是运行的程序,每个进程有自己的系统状态,包含了内存、打开文件列表、程序计数器(跟踪执行的指令)、存储函数本地调用变量的堆栈。

使用os或subprocess可以创建新进程,比如:os.fork(), subprocess.Popen()。子进程和父进程是相互独立执行的。

interprocess communication (IPC)进程间的通信: 最常见的形式是基于消息传递(message passing)。message是原始字节的缓存,通过I/O channel比如网络socket和管道,使用原语比如send() and recv()来发送接收消息。次常用的有内存映射区:memory-mapped regions,见mmap模块,实际上是共享内存。

线程有自己的控制流和执行堆栈,但是共享系统资源和数据。

并发的难点:同步和数据共享。解决的方法一般是使用互斥锁。


write_lock = Lock()
...
# Critical section where writing occurs
write_lock.acquire()
f.write("Here's some data.\n")
f.write("Here's more data.\n")
...
write_lock.release()

python的并发程序设计

多数系统上,Python支持消息传递和基于线程的并发程序设计。global interpreter lock (the GIL)机制实际每个时间单元只允许单个线程执行,哪怕有多个CPU。如果瓶颈在I/O,使用多线程效果不错;如果在cpu,效果则会更差。还不如使用子进程和消息传递。线程数一多经常出现以下怪异的问题,比如100个线程工作良好,1000个线程就可能出问题了,这种情况一般需要使用异步事件处理系统,比如中央事件循环可能使用select模块监控I/O资源和分发异步到大量的I/O 处理器。asyncore和流行的第三方的Twisted (http://twistedmatrix/com)可以实现这点。

消息传递在python使用很广,甚至在线程中。它难于出错,减少了锁和同步原语的使用。可以扩展至网络和分布式系统。Python的高级特性比如协程序(coroutines)也使用消息传递抽象。

multiprocessing支持子进程、通信和共享数据、执行不同形式的同步。

multiprocessing

Process类

这个类表示子进程中运行的任务:Process([group [, target [, name [, args [, kwargs]]]]]),构造函数中必须使用关键字参数,target表示可调用对象,args表示调用对象的位置参数元组。kwargs表示调用对象的字典。Name为别名。Group实质上不使用。

方法有:is_alive()、.join([timeout])、run()、start()、terminate()。

属性有:authkey、daemon(要通过start()设置)、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。

Process类中,注意daemon是父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置。

创建函数并将其作为单个进程。


import multiprocessing
import time


def clock(interval):
    for i in range(3):
        print("The time is {0}".format(time.ctime()))
        time.sleep(interval)


if __name__ == '__main__':
    p = multiprocessing.Process(target=clock, args=(2,))
    p.start()

将进程定义为类:


import multiprocessing
import time


class ClockProcess(multiprocessing.Process):

    def __init__(self, interval):
        multiprocessing.Process.__init__(self)
        self.interval = interval

    def run(self):
        for i in range(3):
            print("The time is {0}".format(time.ctime()))
            time.sleep(self.interval)

if __name__ == '__main__':
    p = ClockProcess(2)
    p.start()

注意,要在命令行才能执行,用IDE是不行的。

进程通信

multiprocessing支持管道和队列,都是用消息传递来实现的,队列接口和线程中的队列类似。

Queue([maxsize]):默认不限制大小,队列实质是用管道和锁来实现的。支持线程会给底层管道传送数据。

方法有:cancel_join_thread()、close()、empty()、full()、get([block [, timeout]])、get_nowait()(等同于get(False))、join_thread()、put(item [, block [, timeout]])、put_nowait(item)(等同于put(item, False))、qsize()、JoinableQueue([maxsize])、task_done()、join()

下例使用队列进行通信:

JoinableQueue创建连接的进程队列。队列和普通队列基本一样,不过消费者在处理完毕之后可以通知生产者(q.task_done())。使用共享信号和条件变量实现。join()由生产者使用,等待所有成员都收到task_done。


import multiprocessing


def consumer(input_q):
    while True:
        item = input_q.get()
        print(item)
        input_q.task_done()


def producer(sequence, output_q):
    for item in sequence:
        output_q.put(item)

if __name__ == '__main__':
    q = multiprocessing.JoinableQueue()
    cons_p = multiprocessing.Process(target=consumer, args=(q,))
    cons_p.daemon = True
    cons_p.start()
    sequence = [1, 2, 3, 4]
    producer(sequence, q)
    q.join()

这里控制多进程的关键在于队列get()之后,使用task_done()指示该元素处理完毕;进程启动之前设置了daemon为True;对队列使用join()。

这种方法可以启动多个进程,如下:


    process = []
    key_list = multiprocessing.JoinableQueue()

    # Launch the consumer process
    for i in range(10):
        t = multiprocessing.Process(target=consumer,args=(key_list,lock))
        t.daemon=True
        process.append(t)

    for i in range(10):
        process[i].start()

    producer( key_list )

    key_list.join()

下面有个应用实例:

https://bitbucket.org/china-testing/small_python_daily_tools/src/87d81739633482abdd3a2d0d11f62f6edd989555/db/mysql/check_transfer.py?at=default&fileviewer=file-view-default

在某些程序中,生产者需要告知消费者没有更多项目了,消费者可以关闭了。这时需要使用哨兵(sentinel)。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# multiprocessing_sentinel.py
# Author Rongzhong Xu 2016-08-11 wechat: pythontesting
"""
multiprocessing sentinel demo,
Tesed in python2.7/3.5/2.6

"""
import multiprocessing


def consumer(input_q):

    while True:
        item = input_q.get()
        if item is None:
            break
        # Process item
        print(item)  # Replace with useful work

    # Shutdown
    print("Consumer done")


def producer(sequence, output_q):
    for item in sequence:
        # Put the item on the queue
        output_q.put(item)


if __name__ == '__main__':
    q = multiprocessing.Queue()
    # Launch the consumer process
    cons_p = multiprocessing.Process(target=consumer, args=(q,))
    cons_p.start()
    # Produce items
    sequence = [1, 2, 3, 4]
    producer(sequence, q)
    # Signal completion by putting the sentinel on the queue
    q.put(None)
    # Wait for the consumer process to shutdown
    cons_p.join()

注意:每个消费者都需要一个:sentinel,可以使用for语句来实现


    for i in range(10):
        q.put(None)

实际使用中不局限于使用None,使用其他特殊符号等也是可以的。上面程序从表面看比使用JoinableQueue要复杂,实现的效果又是一样的。实际上这种场景应用更广泛,在consumer比较耗时的情况下,JoinableQueue如果锁住整个函数则互相等待的时间太长,如果不锁,后面几次执行可能丢失数据。

管道

使用管道:Pipe([duplex]),返回值:元组(conn1, conn2)。conn1和conn2为Connection对象,代表管道的末端。管道默认是双向的,如果设置duplex为False,conn1只能接收,conn2只能发送。

Connection对象的方法和属性如下:

close()、fileno()、poll([timeout])、recv()、recv_bytes([maxlength])、recv_bytes_into(buffer [, offset])、send(obj)、send_bytes(buffer [, offset [, size]])

下面例子实现和之前类似的功能:


def consumer(pipe):
    output_p, input_p = pipe
    input_p.close()  # Close the input end of the pipe
    while True:
        try:
            item = output_p.recv()
        except EOFError:
            break
        # Process item
        print(item)  # Replace with useful work
        # Shutdown
    print("Consumer done")

# Produce items and put on a queue. sequence is an
# iterable representing items to be processed.


def producer(sequence, input_p):
    for item in sequence:
        # Put the item on the queue
        input_p.send(item)


if __name__ == '__main__':

    (output_p, input_p) = multiprocessing.Pipe()
    # Launch the consumer process
    cons_p = multiprocessing.Process(
        target=consumer, args=((output_p, input_p),))
    cons_p.start()
    # Close the output pipe in the producer
    output_p.close()
    # Produce items
    sequence = [1, 2, 3, 4]
    producer(sequence, input_p)
    # Signal completion by closing the input pipe
    input_p.close()
    # Wait for the consumer process to shutdown
    cons_p.join()

管道还可以用于双向通信,比如下例的C/S模式:


import multiprocessing

# A server process


def adder(pipe):
    server_p, client_p = pipe
    client_p.close()
    while True:
        try:
            x, y = server_p.recv()
        except EOFError:
            break
        result = x + y
        server_p.send(result)
    # Shutdown
    print("Server done")


if __name__ == '__main__':

    (server_p, client_p) = multiprocessing.Pipe()
    # Launch the server process
    adder_p = multiprocessing.Process(
        target=adder, args=((server_p, client_p),))
    adder_p.start()
    # Close the server pipe in the client
    server_p.close()
    # Make some requests on the server
    client_p.send((3, 4))
    print(client_p.recv())
    client_p.send(('Hello', 'World'))
    print(client_p.recv())
    # Done. Close the pipe
    client_p.close()
    # Wait for the consumer process to shutdown
    adder_p.join()

send()和recv()使用pickle序列化对象。更高级的程序需要使用远程过程调用,需要使用到进程池。

进程池

Pool类在简单的情况下可用于管理固定数量的消费者。进程池的功能和列表解析及函数式编程中的map-reduce类似。


import multiprocessing
import time


def do_calculation(data):
    return data * 2


def start_process():
    print('Starting {0}'.format(multiprocessing.current_process().name))

if __name__ == '__main__':

    # convert range to list for python3
    inputs = list(range(100))

    time1 = time.time()
    builtin_outputs = map(do_calculation, inputs)

    # convert to list for python3
    print('Built-in: {0}'.format(list(builtin_outputs)))
    time2 = time.time()
    print(time2 - time1)

    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(processes=pool_size,
                                initializer=start_process,
                                )
    pool_outputs = pool.map(do_calculation, inputs)
    pool.close()  # no more tasks
    pool.join()  # wrap up current tasks
    time3 = time.time()
    print('Pool    : {0}'.format(pool_outputs))
    print(time3 - time2)

执行结果:


$ python3 multiprocessing_pool.py 
Built-in: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100, 102, 104, 106, 108, 110, 112, 114, 116, 118, 120, 122, 124, 126, 128, 130, 132, 134, 136, 138, 140, 142, 144, 146, 148, 150, 152, 154, 156, 158, 160, 162, 164, 166, 168, 170, 172, 174, 176, 178, 180, 182, 184, 186, 188, 190, 192, 194, 196, 198]
3.790855407714844e-05
Starting ForkPoolWorker-1
Starting ForkPoolWorker-2
Starting ForkPoolWorker-3
Starting ForkPoolWorker-4
Starting ForkPoolWorker-5
Starting ForkPoolWorker-6
Starting ForkPoolWorker-7
Starting ForkPoolWorker-8
Starting ForkPoolWorker-9
Starting ForkPoolWorker-10
Starting ForkPoolWorker-11
Starting ForkPoolWorker-12
Starting ForkPoolWorker-13
Starting ForkPoolWorker-14
Starting ForkPoolWorker-15
Starting ForkPoolWorker-16
Pool    : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100, 102, 104, 106, 108, 110, 112, 114, 116, 118, 120, 122, 124, 126, 128, 130, 132, 134, 136, 138, 140, 142, 144, 146, 148, 150, 152, 154, 156, 158, 160, 162, 164, 166, 168, 170, 172, 174, 176, 178, 180, 182, 184, 186, 188, 190, 192, 194, 196, 198]
0.2203056812286377

上面例子先计算map的时间,然后用进程池的map,计算出时间。在列表数比较少的情况下,多进程的执行时间更短。列表数比较多的情况下,多进程的执行时间更长,可见python内置的map是效率比较高的。

如果消费者函数有内存泄露,可以在执行任务之后重启,设定maxtasksperchild参数即可。


import time


def do_calculation(data):
    return data * 2


def start_process():
    print('Starting {0}'.format(multiprocessing.current_process().name))

if __name__ == '__main__':

    # convert range to list for python3
    inputs = list(range(100))

    time1 = time.time()
    builtin_outputs = map(do_calculation, inputs)

    # convert to list for python3
    print('Built-in: {0}'.format(list(builtin_outputs)))
    time2 = time.time()
    print(time2 - time1)

    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(processes=pool_size,
                                initializer=start_process,
                                maxtasksperchild=3,
                                )
    pool_outputs = pool.map(do_calculation, inputs)
    pool.close()  # no more tasks
    pool.join()  # wrap up current tasks
    time3 = time.time()
    print('Pool    : {0}'.format(pool_outputs))
    print(time3 - time2)

执行结果:


$ python3 multiprocessing_pool2.py 
Built-in: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100, 102, 104, 106, 108, 110, 112, 114, 116, 118, 120, 122, 124, 126, 128, 130, 132, 134, 136, 138, 140, 142, 144, 146, 148, 150, 152, 154, 156, 158, 160, 162, 164, 166, 168, 170, 172, 174, 176, 178, 180, 182, 184, 186, 188, 190, 192, 194, 196, 198]
3.600120544433594e-05
Starting ForkPoolWorker-1
Starting ForkPoolWorker-3
Starting ForkPoolWorker-2
Starting ForkPoolWorker-4
Starting ForkPoolWorker-5
Starting ForkPoolWorker-6
Starting ForkPoolWorker-7
Starting ForkPoolWorker-8
Starting ForkPoolWorker-9
Starting ForkPoolWorker-10
Starting ForkPoolWorker-11
Starting ForkPoolWorker-12
Starting ForkPoolWorker-13
Starting ForkPoolWorker-14
Starting ForkPoolWorker-15
Starting ForkPoolWorker-16
Starting ForkPoolWorker-17
Starting ForkPoolWorker-18
Starting ForkPoolWorker-19
Starting ForkPoolWorker-20
Starting ForkPoolWorker-21
Starting ForkPoolWorker-22
Starting ForkPoolWorker-23
Starting ForkPoolWorker-24
Starting ForkPoolWorker-25
Starting ForkPoolWorker-26
Starting ForkPoolWorker-27
Starting ForkPoolWorker-28
Starting ForkPoolWorker-29
Starting ForkPoolWorker-30
Starting ForkPoolWorker-31
Starting ForkPoolWorker-32
Pool    : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100, 102, 104, 106, 108, 110, 112, 114, 116, 118, 120, 122, 124, 126, 128, 130, 132, 134, 136, 138, 140, 142, 144, 146, 148, 150, 152, 154, 156, 158, 160, 162, 164, 166, 168, 170, 172, 174, 176, 178, 180, 182, 184, 186, 188, 190, 192, 194, 196, 198]
0.23842501640319824

从结果看,进程数有所增加。(注意,进程数似乎比预期的要少)

Pool([numprocess [,initializer [, initargs]]])

numprocess的默认值是cpu_count()。方法有:apply(func [, args [, kwargs]]),apply_async(func [, args [, kwargs [, callback]]]),close(),join(),imap(func, iterable [, chunksize]),imap_unordered(func, iterable [, chunksize]]),map(func, iterable [, chunksize]),map_async(func, iterable [, chunksize [, callback]]),terminate().

返回结果AsyncResult的方法:get([timeout])、ready()、sucessful()、wait([timeout])、wait([timeout])

以下代码生成指定目录的文件名和SHA512对应表的字典。


import multiprocessing
import hashlib
import binascii

# Some parameters you can tweak
BUFSIZE = 8192              # Read buffer size
POOLSIZE = 2                # Number of workers


def compute_digest(filename):
    try:
        f = open(filename, "rb")
    except IOError:
        return None
    digest = hashlib.sha512()
    while True:
        chunk = f.read(BUFSIZE)
        if not chunk:
            break
        digest.update(chunk)
    f.close()
    return filename, digest.digest()


def build_digest_map(topdir):
    digest_pool = multiprocessing.Pool(POOLSIZE)
    allfiles = (os.path.join(path, name)
                for path, dirs, files in os.walk(topdir)
                for name in files)
    digest_map = dict(digest_pool.imap_unordered(compute_digest, allfiles, 20))
    digest_pool.close()
    return digest_map

# Try it out. Change the directory name as desired.
if __name__ == '__main__':
    digest_map = build_digest_map("/home/andrew/data/code/python/\
python-chinese-library/libraries/multiprocessing")
    print(len(digest_map))
    for key in digest_map.keys():
        print("{0}: {1}".format(key, binascii.hexlify(digest_map[key])))

共享数据和同步

共享内存通过mmap实现。共享内存中创建的是ctypes对象,不需要管道中的序列化。

Value(typecode, arg1, ... argN, lock),RawValue(typecode, arg1, ..., argN),Array(typecode, initializer, lock),RawArray(typecode, initializer)

原语有: Lock,Rlock,Semaphore,BoundedSemaphore,Event,Condition.


import multiprocessing


class FloatChannel(object):

    def __init__(self, maxsize):
        self.buffer = multiprocessing.RawArray('d', maxsize)
        self.buffer_len = multiprocessing.Value('i')
        self.empty = multiprocessing.Semaphore(1)
        self.full = multiprocessing.Semaphore(0)

    def send(self, values):
        self.empty.acquire()              # Only proceed if buffer empty
        nitems = len(values)
        self.buffer_len = nitems          # Set the buffer size
        self.buffer[:nitems] = values     # Copy values into the buffer
        self.full.release()               # Signal that buffer is full

    def recv(self):
        self.full.acquire()               # Only proceed if buffer full
        values = self.buffer[:self.buffer_len.value]    # Copy values
        self.empty.release()              # Signal that buffer is empty
        return values

# Performance test. Receive a bunch of messages


def consume_test(count, ch):
    for i in range(count):
        values = ch.recv()

# Performance test. Send a bunch of messages


def produce_test(count, values, ch):
    for i in range(count):
        ch.send(values)

if __name__ == '__main__':

    ch = FloatChannel(100000)
    p = multiprocessing.Process(target=consume_test,
                                args=(1000, ch))
    p.start()
    values = [float(x) for x in range(100000)]
    produce_test(1000, values, ch)
    print("Done")
    p.join()

参考资料

相关文章
|
2天前
|
机器学习/深度学习 编解码 算法
常用的Python库介绍
Python作为一种功能强大的编程语言,拥有众多的第三方库和框架,这些库和框架覆盖了从数据处理、网络编程、Web开发到人工智能等多个领域。
30 15
|
3天前
|
分布式计算 大数据 Java
如何使用Python的pyodps库来进行跨项目空间重命名表名?
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
28 12
|
2天前
|
存储 监控 数据可视化
【Bokeh 库】Python 中的动态数据可视化
【7月更文挑战第15天】Python的Bokeh库是用于动态数据可视化的利器,它能创建交互式、现代Web浏览器兼容的图表。安装Bokeh只需`pip install bokeh`。基础概念包括Plot、Glyph、数据源和工具。通过示例展示了如何用Bokeh创建动态折线图,包括添加HoverTool。Bokeh还支持散点图、柱状图,可自定义样式和布局,添加更多交互工具,并能构建交互式应用和实时数据流更新。适用于数据探索和实时监控。
18 5
|
9天前
|
网络协议 安全 Shell
`nmap`是一个开源的网络扫描工具,用于发现网络上的设备和服务。Python的`python-nmap`库允许我们在Python脚本中直接使用`nmap`的功能。
`nmap`是一个开源的网络扫描工具,用于发现网络上的设备和服务。Python的`python-nmap`库允许我们在Python脚本中直接使用`nmap`的功能。
|
5天前
|
数据采集 搜索推荐 机器人
Python 神器:wxauto 库
Python 神器:wxauto 库
40 1
|
8天前
|
消息中间件 安全 数据处理
Python中的并发编程:理解多线程与多进程的区别与应用
在Python编程中,理解并发编程是提高程序性能和响应速度的关键。本文将深入探讨多线程和多进程的区别、适用场景及实际应用,帮助开发者更好地利用Python进行并发编程。
|
Linux Python
博客链接—Python
001 Import this—Python的设计原则 :http://blog.itpub.net/29067253/viewspace-2072710/ 002 Python问答环节(1):http://blog.
913 0
|
13天前
|
安全 Python
告别低效编程!Python线程与进程并发技术详解,让你的代码飞起来!
【7月更文挑战第9天】Python并发编程提升效率:**理解并发与并行,线程借助`threading`模块处理IO密集型任务,受限于GIL;进程用`multiprocessing`实现并行,绕过GIL限制。示例展示线程和进程创建及同步。选择合适模型,注意线程安全,利用多核,优化性能,实现高效并发编程。
27 3
|
13天前
|
数据采集 大数据 数据安全/隐私保护
Python编程:如何有效等待套接字的读取与关闭
Python网络编程中,套接字事件处理至关重要。利用`selectors`模块和代理IP能增强程序的稳定性和可靠性。代码示例展示了如何通过代理连接目标服务器,注册套接字的读写事件并高效处理。在代理IP配置、连接创建、事件循环及回调函数中,实现了数据收发与连接管理,有效应对网络爬虫或聊天应用的需求,同时保护了真实IP。
Python编程:如何有效等待套接字的读取与关闭
|
4天前
|
Python
告别低效!Python并查集:数据结构界的超级英雄,拯救你的编程人生!
【7月更文挑战第18天】并查集,数据结构超级英雄,用于不相交集合的合并与查询。Python实现包括初始化、查找根节点和合并操作。应用广泛,如社交网络分析、图论问题、集合划分等。示例代码展示了解决岛屿数量问题,统计连通的“1”单元格数。掌握并查集,提升编程效率,解决复杂问题。
20 6