并发编程是现代软件开发中的重要组成部分,尤其在处理大量IO操作、提升应用响应速度和资源利用率方面发挥着关键作用。Python,作为一门广泛应用于科学计算、Web开发、数据分析等多个领域的高级编程语言,提供了多种并发编程模型,包括线程、进程以及更高级的协程。本文将深入探讨这三种并发模型,特别是它们的同步技巧,通过理论讲解与实战代码案例相结合,帮助读者掌握Python并发编程的艺术。
1. 并发编程基础
并发是指程序在一段时间内同时处理多个任务的能力。这并不意味着所有的任务都在同一时刻执行(这在单核处理器上是不可能的),而是指通过时间切片、多处理器或异步IO等方式,让多个任务看似同时进行。并发提高了资源利用率,使得程序能够更高效地处理任务。
1.1 并发与并行的区别
- 并发:指任务在宏观上同时进行,微观上可能交替执行。
- 并行:指任务在微观上同时执行,通常需要多核处理器支持。
2. Python中的线程
线程是操作系统能够进行调度的最小执行单位。在Python中,可以通过threading
模块创建和管理线程。
2.1 创建线程
import threading
import time
def thread_function(name):
print(f"Thread {name}: starting")
time.sleep(2)
print(f"Thread {name}: finishing")
if __name__ == "__main__":
threads = list()
for index in range(3):
thread = threading.Thread(target=thread_function, args=(index,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
print("Main thread finished")
2.2 同步:Locks与Condition Objects
Locks
为了避免多个线程同时修改共享资源引发的数据不一致性,可以使用Lock
。
import threading
counter = 0
lock = threading.Lock()
def increment():
global counter
with lock:
local_counter = counter
local_counter += 1
time.sleep(0.1) # 模拟IO延迟
counter = local_counter
threads = [threading.Thread(target=increment) for _ in range(100)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print(f"Counter: {counter}")
Condition Objects
Condition
对象允许一个或多个线程等待直到某个条件满足。
import threading
condition = threading.Condition()
workers = 5
def worker(num):
with condition:
while True:
if workers > 0:
print(f"Worker {num} starts working")
workers -= 1
condition.notify_all()
break
else:
print(f"No work for worker {num}, waiting...")
condition.wait()
threads = [threading.Thread(target=worker, args=(i,)) for i in range(10)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
3. Python中的进程
进程是资源分配的最小单位,拥有独立的内存空间。Python的multiprocessing
模块提供了创建和管理进程的功能。
3.1 进程间通信
使用Queue
from multiprocessing import Process, Queue
def worker(q):
item = q.get()
print(f'Processing {item}')
if __name__ == '__main__':
q = Queue()
p = Process(target=worker, args=(q,))
p.start()
q.put(1)
p.join()
使用Pipe
from multiprocessing import Process, Pipe
def sender(conn, msgs):
for msg in msgs:
conn.send(msg)
conn.close()
def receiver(conn):
while True:
msg = conn.recv()
if msg == 'END':
break
print(f"Received: {msg}")
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=sender, args=(child_conn, ['Hello', 'World', 'END']))
p.start()
receiver(parent_conn)
p.join()
4. Python中的协程
协程是一种轻量级的线程,通过在单线程内实现任务切换,避免了线程上下文切换的开销。Python通过asyncio
模块支持协程。
4.1 异步编程基础
import asyncio
async def my_coroutine():
print('Coroutine started.')
await asyncio.sleep(1)
print('Coroutine finished.')
async def main():
task = asyncio.create_task(my_coroutine())
await task
asyncio.run(main())
4.2 使用asyncio的Event Loop与Tasks
import asyncio
async def my_task(number):
print(f'Task {number} started')
await asyncio.sleep(1)
print(f'Task {number} finished')
async def main():
tasks = []
for i in range(5):
tasks.append(asyncio.create_task(my_task(i)))
await asyncio.gather(*tasks)
asyncio.run(main())
4.3 异步共享状态与Locks
尽管协程在单线程内执行,但当涉及到共享状态时,依然需要同步机制。
import asyncio
async def increment(counter, lock):
async with lock:
global num
num = counter.value + 1
counter.value = num
await asyncio.sleep(0.1)
async def main():
counter = asyncio.Value('i', 0)
lock = asyncio.Lock()
tasks = [increment(counter, lock) for _ in range(100)]
await asyncio.gather(*tasks)
print(f"Final count: {counter.value}")
asyncio.run(main())
5. 总结
Python的并发编程模型为开发者提供了从线程、进程到协程的多样化选择,每种模型都有其适用场景。理解这些模型的核心概念、掌握它们之间的区别与同步技巧,是提升应用性能的关键。通过上述代码案例的学习,希望读者能够深入理解Python并发编程的艺术,灵活运用线程、进程与协程,有效地解决并发场景下的各种挑战。