每周一个 Python 模块 | threading

简介: 其实在 Python 中,多线程是不推荐使用的,除非明确不支持使用多进程的场景,否则的话,能用多进程就用多进程吧。写这篇文章的目的,可以对比多进程的文章来看,有很多相通的地方,看完也许会对并发编程有更好的理解。

其实在 Python 中,多线程是不推荐使用的,除非明确不支持使用多进程的场景,否则的话,能用多进程就用多进程吧。写这篇文章的目的,可以对比多进程的文章来看,有很多相通的地方,看完也许会对并发编程有更好的理解。


GIL


Python(特指 CPython)的多线程的代码并不能利用多核的优势,而是通过著名的全局解释锁(GIL)来进行处理的。如果是一个计算型的任务,使用多线程 GIL 就会让多线程变慢。我们举个计算斐波那契数列的例子:


# coding=utf-8
import time
import threading
def profile(func):
    def wrapper(*args, **kwargs):
        import time
        start = time.time()
        func(*args, **kwargs)
        end   = time.time()
        print 'COST: {}'.format(end - start)
    return wrapper
def fib(n):
    if n<= 2:
        return 1
    return fib(n-1) + fib(n-2)
@profile
def nothread():
    fib(35)
    fib(35)
@profile
def hasthread():
    for i in range(2):
        t = threading.Thread(target=fib, args=(35,))
        t.start()
    main_thread = threading.currentThread()
    for t in threading.enumerate():
        if t is main_thread:
            continue
        t.join()
nothread()
hasthread()
# output
# COST: 5.05716490746
# COST: 6.75599503517
复制代码


运行的结果你猜猜会怎么样?还不如不用多线程!

GIL 是必须的,这是 Python 设计的问题:Python 解释器是非线程安全的。这意味着当从线程内尝试安全的访问Python 对象的时候将有一个全局的强制锁。 在任何时候,仅仅一个单一的线程能够获取 Python 对象或者 C API。每 100 个字节的 Python 指令解释器将重新获取锁,这(潜在的)阻塞了 I/O 操作。因为锁,CPU 密集型的代码使用线程库时,不会获得性能的提高(但是当它使用之后介绍的多进程库时,性能可以获得提高)。

那是不是由于 GIL 的存在,多线程库就是个「鸡肋」呢?当然不是。事实上我们平时会接触非常多的和网络通信或者数据输入/输出相关的程序,比如网络爬虫、文本处理等等。这时候由于网络情况和 I/O 的性能的限制,Python 解释器会等待读写数据的函数调用返回,这个时候就可以利用多线程库提高并发效率了。


线程对象


先说一个非常简单的方法,直接使用 Thread 来实例化目标函数,然后调用 start() 来执行。


import threading
def worker():
    """thread worker function"""
    print('Worker')
threads = []
for i in range(5):
    t = threading.Thread(target=worker)
    threads.append(t)
    t.start()
# output
# Worker
# Worker
# Worker
# Worker
# Worker
复制代码


生成线程时可以传递参数给线程,什么类型的参数都可以。下面这个例子只传了一个数字:


import threading
def worker(num):
    """thread worker function"""
    print('Worker: %s' % num)
threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()
# output
# Worker: 0
# Worker: 1
# Worker: 2
# Worker: 3
# Worker: 4
复制代码


还有一种创建线程的方法,通过继承 Thread 类,然后重写 run() 方法,代码如下:


import threading
import logging
class MyThread(threading.Thread):
    def run(self):
        logging.debug('running')
logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)
for i in range(5):
    t = MyThread()
    t.start()
# output
# (Thread-1  ) running
# (Thread-2  ) running
# (Thread-3  ) running
# (Thread-4  ) running
# (Thread-5  ) running
复制代码


因为传递给 Thread 构造函数的参数 argskwargs 被保存成了带 __ 前缀的私有变量,所以在子线程中访问不到,所以在自定义线程类中,要重新构造函数。


import threading
import logging
class MyThreadWithArgs(threading.Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, *, daemon=None):
        super().__init__(group=group, target=target, name=name,
                         daemon=daemon)
        self.args = args
        self.kwargs = kwargs
    def run(self):
        logging.debug('running with %s and %s',
                      self.args, self.kwargs)
logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)
for i in range(5):
    t = MyThreadWithArgs(args=(i,), kwargs={'a': 'A', 'b': 'B'})
    t.start()
# output
# (Thread-1  ) running with (0,) and {'b': 'B', 'a': 'A'}
# (Thread-2  ) running with (1,) and {'b': 'B', 'a': 'A'}
# (Thread-3  ) running with (2,) and {'b': 'B', 'a': 'A'}
# (Thread-4  ) running with (3,) and {'b': 'B', 'a': 'A'}
# (Thread-5  ) running with (4,) and {'b': 'B', 'a': 'A'}
复制代码


确定当前线程


每个 Thread 都有一个名称,可以使用默认值,也可以在创建线程时指定。


import threading
import time
def worker():
    print(threading.current_thread().getName(), 'Starting')
    time.sleep(0.2)
    print(threading.current_thread().getName(), 'Exiting')
def my_service():
    print(threading.current_thread().getName(), 'Starting')
    time.sleep(0.3)
    print(threading.current_thread().getName(), 'Exiting')
t = threading.Thread(name='my_service', target=my_service)
w = threading.Thread(name='worker', target=worker)
w2 = threading.Thread(target=worker)  # use default name
w.start()
w2.start()
t.start()
# output
# worker Starting
# Thread-1 Starting
# my_service Starting
# worker Exiting
# Thread-1 Exiting
# my_service Exiting
复制代码


守护线程


默认情况下,在所有子线程退出之前,主程序不会退出。有些时候,启动后台线程运行而不阻止主程序退出是有用的,例如为监视工具生成“心跳”的任务。

要将线程标记为守护程序,在创建时传递 daemon=True 或调用set_daemon(True),默认情况下,线程不是守护进程。


import threading
import time
import logging
def daemon():
    logging.debug('Starting')
    time.sleep(0.2)
    logging.debug('Exiting')
def non_daemon():
    logging.debug('Starting')
    logging.debug('Exiting')
logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)
d = threading.Thread(name='daemon', target=daemon, daemon=True)
t = threading.Thread(name='non-daemon', target=non_daemon)
d.start()
t.start()
# output
# (daemon    ) Starting
# (non-daemon) Starting
# (non-daemon) Exiting
复制代码


输出不包含守护线程的 Exiting,因为在守护线程从 sleep() 唤醒之前,其他线程,包括主程序都已经退出了。

如果想等守护线程完成工作,可以使用 join() 方法。


import threading
import time
import logging
def daemon():
    logging.debug('Starting')
    time.sleep(0.2)
    logging.debug('Exiting')
def non_daemon():
    logging.debug('Starting')
    logging.debug('Exiting')
logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)
d = threading.Thread(name='daemon', target=daemon, daemon=True)
t = threading.Thread(name='non-daemon', target=non_daemon)
d.start()
t.start()
d.join()
t.join()
# output
# (daemon    ) Starting
# (non-daemon) Starting
# (non-daemon) Exiting
# (daemon    ) Exiting
复制代码


输出信息已经包括守护线程的 Exiting

默认情况下,join()无限期地阻止。也可以传一个浮点值,表示等待线程变为非活动状态的秒数。如果线程未在超时期限内完成,则join()无论如何都会返回。


import threading
import time
import logging
def daemon():
    logging.debug('Starting')
    time.sleep(0.2)
    logging.debug('Exiting')
def non_daemon():
    logging.debug('Starting')
    logging.debug('Exiting')
logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)
d = threading.Thread(name='daemon', target=daemon, daemon=True)
t = threading.Thread(name='non-daemon', target=non_daemon)
d.start()
t.start()
d.join(0.1)
print('d.isAlive()', d.isAlive())
t.join()
# output
# (daemon    ) Starting
# (non-daemon) Starting
# (non-daemon) Exiting
# d.isAlive() True
复制代码


由于传递的超时小于守护程序线程休眠的时间,因此join() 返回后线程仍处于“活动”状态。


枚举所有线程


enumerate() 方法可以返回活动 Thread 实例列表。由于该列表包括当前线程,并且由于加入当前线程会引入死锁情况,因此必须跳过它。


import random
import threading
import time
import logging
def worker():
    """thread worker function"""
    pause = random.randint(1, 5) / 10
    logging.debug('sleeping %0.2f', pause)
    time.sleep(pause)
    logging.debug('ending')
logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)
for i in range(3):
    t = threading.Thread(target=worker, daemon=True)
    t.start()
main_thread = threading.main_thread()
for t in threading.enumerate():
    if t is main_thread:
        continue
    logging.debug('joining %s', t.getName())
    t.join()
# output
# (Thread-1  ) sleeping 0.20
# (Thread-2  ) sleeping 0.30
# (Thread-3  ) sleeping 0.40
# (MainThread) joining Thread-1
# (Thread-1  ) ending
# (MainThread) joining Thread-3
# (Thread-2  ) ending
# (Thread-3  ) ending
# (MainThread) joining Thread-2
复制代码


计时器线程


Timer() 在延迟时间后开始工作,并且可以在该延迟时间段内的任何时间点取消。


import threading
import time
import logging
def delayed():
    logging.debug('worker running')
logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)
t1 = threading.Timer(0.3, delayed)
t1.setName('t1')
t2 = threading.Timer(0.3, delayed)
t2.setName('t2')
logging.debug('starting timers')
t1.start()
t2.start()
logging.debug('waiting before canceling %s', t2.getName())
time.sleep(0.2)
logging.debug('canceling %s', t2.getName())
t2.cancel()
logging.debug('done')
# output
# (MainThread) starting timers
# (MainThread) waiting before canceling t2
# (MainThread) canceling t2
# (MainThread) done
# (t1        ) worker running
复制代码

此示例中的第二个计时器不会运行,并且第一个计时器似乎在主程序完成后运行的。由于它不是守护线程,因此在完成主线程时会隐式调用它。


同步机制


Semaphore


在多线程编程中,为了防止不同的线程同时对一个公用的资源(比如全部变量)进行修改,需要进行同时访问的数量(通常是 1)。信号量同步基于内部计数器,每调用一次 acquire(),计数器减 1;每调用一次 release(),计数器加 1。当计数器为 0 时,acquire() 调用被阻塞。


import logging
import random
import threading
import time
class ActivePool:
    def __init__(self):
        super(ActivePool, self).__init__()
        self.active = []
        self.lock = threading.Lock()
    def makeActive(self, name):
        with self.lock:
            self.active.append(name)
            logging.debug('Running: %s', self.active)
    def makeInactive(self, name):
        with self.lock:
            self.active.remove(name)
            logging.debug('Running: %s', self.active)
def worker(s, pool):
    logging.debug('Waiting to join the pool')
    with s:
        name = threading.current_thread().getName()
        pool.makeActive(name)
        time.sleep(0.1)
        pool.makeInactive(name)
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s (%(threadName)-2s) %(message)s',
)
pool = ActivePool()
s = threading.Semaphore(2)
for i in range(4):
    t = threading.Thread(
        target=worker,
        name=str(i),
        args=(s, pool),
    )
    t.start()
# output
# 2016-07-10 10:45:29,398 (0 ) Waiting to join the pool
# 2016-07-10 10:45:29,398 (0 ) Running: ['0']
# 2016-07-10 10:45:29,399 (1 ) Waiting to join the pool
# 2016-07-10 10:45:29,399 (1 ) Running: ['0', '1']
# 2016-07-10 10:45:29,399 (2 ) Waiting to join the pool
# 2016-07-10 10:45:29,399 (3 ) Waiting to join the pool
# 2016-07-10 10:45:29,501 (1 ) Running: ['0']
# 2016-07-10 10:45:29,501 (0 ) Running: []
# 2016-07-10 10:45:29,502 (3 ) Running: ['3']
# 2016-07-10 10:45:29,502 (2 ) Running: ['3', '2']
# 2016-07-10 10:45:29,607 (3 ) Running: ['2']
# 2016-07-10 10:45:29,608 (2 ) Running: []
复制代码


在这个例子中,ActivePool() 类只是为了展示在同一时刻,最多只有两个线程在运行。


Lock


Lock 也可以叫做互斥锁,其实相当于信号量为 1。我们先看一个不加锁的例子:


import time
from threading import Thread
value = 0
def getlock():
    global value
    new = value + 1
    time.sleep(0.001)  # 使用sleep让线程有机会切换
    value = new
threads = []
for i in range(100):
    t = Thread(target=getlock)
    t.start()
    threads.append(t)
for t in threads:
    t.join()
print value # 16
复制代码


不加锁的情况下,结果会远远的小于 100。那我们加上互斥锁看看:


import time
from threading import Thread, Lock
value = 0
lock = Lock()
def getlock():
    global value
    with lock:
        new = value + 1
        time.sleep(0.001)
        value = new
threads = []
for i in range(100):
    t = Thread(target=getlock)
    t.start()
    threads.append(t)
for t in threads:
    t.join()
print value # 100
复制代码


RLock


acquire() 能够不被阻塞的被同一个线程调用多次。但是要注意的是 release() 需要调用与 acquire() 相同的次数才能释放锁。

先看一下使用 Lock 的情况:


import threading
lock = threading.Lock()
print('First try :', lock.acquire())
print('Second try:', lock.acquire(0))
# output
# First try : True
# Second try: False
复制代码


在这种情况下,第二次调用将 acquire() 被赋予零超时以防止它被阻塞,因为第一次调用已获得锁定。

再看看用RLock替代的情况。


import threading
lock = threading.RLock()
print('First try :', lock.acquire())
print('Second try:', lock.acquire(0))
# output
# First try : True
# Second try: True
复制代码


Condition


一个线程等待特定条件,而另一个线程发出特定条件满足的信号。最好说明的例子就是「生产者/消费者」模型:


import time
import threading
def consumer(cond):
    t = threading.currentThread()
    with cond:
        cond.wait()  # wait()方法创建了一个名为waiter的锁,并且设置锁的状态为locked。这个waiter锁用于线程间的通讯
        print '{}: Resource is available to consumer'.format(t.name)
def producer(cond):
    t = threading.currentThread()
    with cond:
        print '{}: Making resource available'.format(t.name)
        cond.notifyAll()  # 释放waiter锁,唤醒消费者
condition = threading.Condition()
c1 = threading.Thread(name='c1', target=consumer, args=(condition,))
c2 = threading.Thread(name='c2', target=consumer, args=(condition,))
p = threading.Thread(name='p', target=producer, args=(condition,))
c1.start()
time.sleep(1)
c2.start()
time.sleep(1)
p.start()
# output
# p: Making resource available
# c2: Resource is available to consumer
# c1: Resource is available to consumer
复制代码


可以看到生产者发送通知之后,消费者都收到了。


Event


一个线程发送/传递事件,另外的线程等待事件的触发。我们同样的用「生产者/消费者」模型的例子:


# coding=utf-8
import time
import threading
from random import randint
TIMEOUT = 2
def consumer(event, l):
    t = threading.currentThread()
    while 1:
        event_is_set = event.wait(TIMEOUT)
        if event_is_set:
            try:
                integer = l.pop()
                print '{} popped from list by {}'.format(integer, t.name)
                event.clear()  # 重置事件状态
            except IndexError:  # 为了让刚启动时容错
                pass
def producer(event, l):
    t = threading.currentThread()
    while 1:
        integer = randint(10, 100)
        l.append(integer)
        print '{} appended to list by {}'.format(integer, t.name)
        event.set()  # 设置事件
        time.sleep(1)
event = threading.Event()
l = []
threads = []
for name in ('consumer1', 'consumer2'):
    t = threading.Thread(name=name, target=consumer, args=(event, l))
    t.start()
    threads.append(t)
p = threading.Thread(name='producer1', target=producer, args=(event, l))
p.start()
threads.append(p)
for t in threads:
    t.join()
# output
# 77 appended to list by producer1
# 77 popped from list by consumer1
# 46 appended to list by producer1
# 46 popped from list by consumer2
# 43 appended to list by producer1
# 43 popped from list by consumer2
# 37 appended to list by producer1
# 37 popped from list by consumer2
# 33 appended to list by producer1
# 33 popped from list by consumer2
# 57 appended to list by producer1
# 57 popped from list by consumer1
复制代码


可以看到事件被 2 个消费者比较平均的接收并处理了。如果使用了 wait() 方法,线程就会等待我们设置事件,这也有助于保证任务的完成。


Queue


队列在并发开发中最常用的。我们借助「生产者/消费者」模式来理解:生产者把生产的「消息」放入队列,消费者从这个队列中对去对应的消息执行。

大家主要关心如下 4 个方法就好了:

  1. put: 向队列中添加一个项。
  2. get: 从队列中删除并返回一个项。
  3. task_done: 当某一项任务完成时调用。
  4. join: 阻塞直到所有的项目都被处理完。
# coding=utf-8
import time
import threading
from random import random
from Queue import Queue
q = Queue()
def double(n):
    return n * 2
def producer():
    while 1:
        wt = random()
        time.sleep(wt)
        q.put((double, wt))
def consumer():
    while 1:
        task, arg = q.get()
        print arg, task(arg)
        q.task_done()
for target in(producer, consumer):
    t = threading.Thread(target=target)
    t.start()
复制代码


这就是最简化的队列架构。

Queue 模块还自带了 PriorityQueue(带有优先级)和 LifoQueue(后进先出)2 种特殊队列。我们这里展示下线程安全的优先级队列的用法,PriorityQueue 要求我们 put 的数据的格式是(priority_number, data),我们看看下面的例子:


import time
import threading
from random import randint
from Queue import PriorityQueue
q = PriorityQueue()
def double(n):
    return n * 2
def producer():
    count = 0
    while 1:
        if count > 5:
            break
        pri = randint(0, 100)
        print 'put :{}'.format(pri)
        q.put((pri, double, pri))  # (priority, func, args)
        count += 1
def consumer():
    while 1:
        if q.empty():
            break
        pri, task, arg = q.get()
        print '[PRI:{}] {} * 2 = {}'.format(pri, arg, task(arg))
        q.task_done()
        time.sleep(0.1)
t = threading.Thread(target=producer)
t.start()
time.sleep(1)
t = threading.Thread(target=consumer)
t.start()
# output
# put :84
# put :86
# put :16
# put :93
# put :14
# put :93
# [PRI:14] 14 * 2 = 28
# 
# [PRI:16] 16 * 2 = 32
# [PRI:84] 84 * 2 = 168
# [PRI:86] 86 * 2 = 172
# [PRI:93] 93 * 2 = 186
# [PRI:93] 93 * 2 = 186
复制代码


其中消费者是故意让它执行的比生产者慢很多,为了节省篇幅,只随机产生 5 次随机结果。可以看到 put 时的数字是随机的,但是 get 的时候先从优先级更高(数字小表示优先级高)开始获取的。


线程池


面向对象开发中,大家知道创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源。无节制的创建和销毁线程是一种极大的浪费。那我们可不可以把执行完任务的线程不销毁而重复利用呢?仿佛就是把这些线程放进一个池子,一方面我们可以控制同时工作的线程数量,一方面也避免了创建和销毁产生的开销。

线程池在标准库中其实是有体现的,只是在官方文章中基本没有被提及:


In : from multiprocessing.pool import ThreadPool
In : pool = ThreadPool(5)
In : pool.map(lambda x: x**2, range(5))
Out: [0, 1, 4, 9, 16]
复制代码


当然我们也可以自己实现一个:


# coding=utf-8
import time
import threading
from random import random
from Queue import Queue
def double(n):
    return n * 2
class Worker(threading.Thread):
    def __init__(self, queue):
        super(Worker, self).__init__()
        self._q = queue
        self.daemon = True
        self.start()
    def run(self):
        while 1:
            f, args, kwargs = self._q.get()
            try:
                print 'USE: {}'.format(self.name)  # 线程名字
                print f(*args, **kwargs)
            except Exception as e:
                print e
            self._q.task_done()
class ThreadPool(object):
    def __init__(self, num_t=5):
        self._q = Queue(num_t)
        # Create Worker Thread
        for _ in range(num_t):
            Worker(self._q)
    def add_task(self, f, *args, **kwargs):
        self._q.put((f, args, kwargs))
    def wait_complete(self):
        self._q.join()
pool = ThreadPool()
for _ in range(8):
    wt = random()
    pool.add_task(double, wt)
    time.sleep(wt)
pool.wait_complete()
# output
# USE: Thread-1
# 1.58762376489
# USE: Thread-2
# 0.0652918738849
# USE: Thread-3
# 0.997407997138
# USE: Thread-4
# 1.69333900685
# USE: Thread-5
# 0.726900613676
# USE: Thread-1
# 1.69110052253
# USE: Thread-2
# 1.89039743989
# USE: Thread-3
# 0.96281118122
复制代码


线程池会保证同时提供 5 个线程工作,但是我们有 8 个待完成的任务,可以看到线程按顺序被循环利用了。


目录
相关文章
|
22天前
|
开发者 Python
函数与模块:编写高效的Python代码
【4月更文挑战第8天】本文介绍了Python中提升代码效率和可读性的关键——函数和模块。函数是可重复调用的代码段,用于封装逻辑,减少重复,提高结构清晰度。通过`def`定义函数,使用`return`返回值,支持位置、关键字、默认和不定长参数。模块是包含Python代码的文件,用于组织代码,可导入使用。通过`import`导入模块,创建自定义模块以分解大型项目。熟悉Python标准库中的模块能提升开发效率。掌握函数和模块的使用对编写高效、易维护的代码至关重要。
|
4天前
|
人工智能 安全 Java
Python 多线程编程实战:threading 模块的最佳实践
Python 多线程编程实战:threading 模块的最佳实践
119 5
|
4天前
|
人工智能 数据库 开发者
Python中的atexit模块:优雅地处理程序退出
Python中的atexit模块:优雅地处理程序退出
8 3
|
7天前
|
存储 开发者 Python
Python中的argparse模块:命令行参数解析的利器
Python中的argparse模块:命令行参数解析的利器
16 2
|
7天前
|
开发者 Python
Python的os模块详解
Python的os模块详解
15 0
|
10天前
|
数据挖掘 API 数据安全/隐私保护
python请求模块requests如何添加代理ip
python请求模块requests如何添加代理ip
|
11天前
|
测试技术 Python
Python 有趣的模块之pynupt——通过pynput控制鼠标和键盘
Python 有趣的模块之pynupt——通过pynput控制鼠标和键盘
|
12天前
|
Serverless 开发者 Python
《Python 简易速速上手小册》第3章:Python 的函数和模块(2024 最新版)
《Python 简易速速上手小册》第3章:Python 的函数和模块(2024 最新版)
40 1
|
13天前
|
开发者 Python
Python中的并发编程:使用asyncio模块实现异步任务
传统的Python编程中,使用多线程或多进程进行并发操作时,常常会面临性能瓶颈和资源竞争的问题。而随着Python 3.5版本的引入,asyncio模块为开发者提供了一种基于协程的异步编程方式。本文将介绍如何使用asyncio模块实现异步任务,提高Python程序的并发处理能力。
|
13天前
|
测试技术 Python
Python 的自动化测试:如何使用 Python 的 unittest 模块进行测试?
在Python中进行自动化测试可利用`unittest`模块。以下是一个简单的示例,展示了如何编写测试用例
10 0