Python 并发编程(一)之线程

简介: 常用用法 t.is_alive() Python中线程会在一个单独的系统级别线程中执行(比如一个POSIX线程或者一个Windows线程)这些线程将由操作系统来全权管理。线程一旦启动,将独立执行直到目标函数返回。

常用用法

t.is_alive()

Python中线程会在一个单独的系统级别线程中执行(比如一个POSIX线程或者一个Windows线程)
这些线程将由操作系统来全权管理。线程一旦启动,将独立执行直到目标函数返回。可以通过查询
一个线程对象的状态,看它是否还在执行t.is_alive()

t.join()

可以把一个线程加入到当前线程,并等待它终止
Python 解释器在所有线程都终止后才继续执行代码剩余的部分

daemon

对于需要长时间运行的线程或者需要一直运行的后台任务,可以用后台线程(也称为守护线程)

例:
t = Thread(target = func, args(1,), daemon = True)
t.start()

后台线程无法等待,这些线程会在主线程终止时自动销毁

小结:
后台线程无法等待,不过,这些线程会在主线程终止时自动销毁。你无法结束一个线程,无法给它发送信
号,无法调整它的调度,也无法执行其他高级操作。如果需要这些特性,你需要自己添加。比如说,
如果你需要终止线程,那么这个线程必须通过编程在某个特定点轮询来退出

如果线程执行一些像 I/O 这样的阻塞操作,那么通过轮询来终止线程将使得线程之间的协调变得非常棘手。
比如,如果一个线程一直阻塞在一个 I/O 操作上,它就永远无法返回,也就无法检查自己是否已经被结束了。
要正确处理这些问题,需要利用超时循环来小心操作线程。

线程间通信

queue

一个线程向另外一个线程发送数据最安全的方式应该就是queue库中的队列
先看一下使用例子,这里是一个简单的生产者和消费者模型:

 1 from queue import Queue
 2 from threading import Thread
 3 import random
 4 import time
 5 
 6 
 7 _sentinel = object()
 8 
 9 
10 def producer(out_q):
11     n = 10
12     while n:
13         time.sleep(1)
14         data = random.randint(0, 10)
15         out_q.put(data)
16         print("生产者生产了数据{0}".format(data))
17         n -= 1
18     out_q.put(_sentinel)
19 
20 
21 def consumer(in_q):
22     while True:
23         data = in_q.get()
24         print("消费者消费了{0}".format(data))
25         if data is _sentinel:
26             in_q.put(_sentinel)
27             break
28 
29 
30 q = Queue()
31 t1 = Thread(target=consumer, args=(q,))
32 t2 = Thread(target=producer, args=(q,))
33 
34 t1.start()
35 t2.start()

上述代码中设置了一个特殊值_sentinel用于当获取到这个值的时候终止执行
关于queue的功能有个需要注意的地方:
Queue对象虽然已经包含了必要的锁,主要有q.put和q.get
而q.size(),q.full(),q.empty()等方法不是线程安全的

使用队列进行线程通信是一个单向、不确定的过程。通常情况下,是没有办法知道接收数据的线程是什么时候接收到的数据并开始工作的。但是队列提供了一些基本的特性:q.task_done()和q.join()

如果一个线程需要在另外一个线程处理完特定的数据任务后立即得到通知,可以把要发送的数据和一个Event放到一起使用

关于线程中的Event

线程有一个非常关键的特性:每个线程都是独立运行的,且状态不可预测
如果程序中的其他线程需要通过判断每个线程的状态来确定自己下一步的操作,这时线程同步问题就会比较麻烦。
解决方法:
使用threading库中的Event
Event对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。
在初始化状态下,event对象中的信号标志被设置为假。
如果有线程等待一个event对象,而这个event的标志为假,这个线程将一直被阻塞知道该标志为真。
一个线程如果把event对象的标志设置为真,就会唤醒所有等待这个event对象的线程。

通过一个代码例子理解:

 1 from threading import Thread, Event
 2 import time
 3 
 4 
 5 def countdown(n, started_evt):
 6     print("countdown starting")
 7     # set将event的标识设置为True
 8     started_evt.set()
 9     while n > 0:
10         print("T-mins", n)
11         n -= 1
12         time.sleep(2)
13 
14 # 初始化的started_evt为False
15 started_evt = Event()
16 print("Launching countdown")
17 t = Thread(target=countdown, args=(10, started_evt,))
18 t.start()
19 # 会一直等待直到event的标志为True的时候
20 started_evt.wait()
21 print("countdown is running")

而结果,我们也可以看出当线程执行了set之后,才打印running

实际用event对象最好是单次使用,创建一个event对象,让某个线程等待这个对象,一旦对象被设置为Tru,就应该丢弃它,我们虽然可以通过clear()方法重置event对象,但是这个没法确保安全的清理event对象并对它进行重新的赋值。会发生错过事件,死锁等各种问题。
event对象的一个重要特点是它被设置为True时会唤醒所有等待它的线程,如果唤醒单个线程的最好用Condition或信号量Semaphore

和event功能类似的线程中还有一个Condition

关于线程中的Condition

关于Condition官网的一段话:
A condition variable is always associated with some kind of lock; this can be passed in or one will be created by default. Passing one in is useful when several condition variables must share the same lock. The lock is part of the condition object: you don’t have to track it separately.

Other methods must be called with the associated lock held. The wait() method releases the lock, and then blocks until another thread awakens it by calling notify() or notify_all(). Once awakened, wait() re-acquires the lock and returns. It is also possible to specify a timeout.

但是需要注意的是: 
notify() and notify_all()这两个方法,不会释放锁,这意味着线程或者被唤醒的线程不会立刻执行wait()

我们可以通过Conditon对象实现一个周期定时器的功能,每当定时器超时的时候,其他线程都可以检测到,代码例子如下:

 1 import threading
 2 import time
 3 
 4 
 5 class PeriodicTimer:
 6     """
 7     这里做了一个定时器
 8     """
 9 
10     def __init__(self, interval):
11         self._interval = interval
12         self._flag = 0
13         self._cv = threading.Condition()
14 
15     def start(self):
16         t = threading.Thread(target=self.run)
17         t.daemon = True
18         t.start()
19 
20     def run(self):
21         while True:
22             time.sleep(self._interval)
23             with self._cv:
24                 # 这个点还是非常有意思的^=
25                 self._flag ^= 1
26                 self._cv.notify_all()
27 
28     def wait_for_tick(self):
29         with self._cv:
30             last_flag = self._flag
31 
32             while last_flag == self._flag:
33                 self._cv.wait()
34 
35 
36 # 下面两个分别为两个需要定时执行的任务
37 def countdown(nticks):
38     while nticks > 0:
39         ptimer.wait_for_tick()
40         print('T-minus', nticks)
41         nticks -= 1
42 
43 
44 def countup(last):
45     n = 0
46     while n < last:
47         ptimer.wait_for_tick()
48         print('Counting', n)
49         n += 1
50 
51 
52 ptimer = PeriodicTimer(5)
53 ptimer.start()
54 
55 threading.Thread(target=countdown, args=(10,)).start()
56 threading.Thread(target=countup, args=(5,)).start()

关于线程中锁的使用

要在多线程中安全使用可变对象,需要使用threading库中的Lock对象
先看一个关于锁的基本使用:

 1 import threading
 2 
 3 
 4 class SharedCounter:
 5 
 6     def __init__(self, initial_value=0):
 7         self._value = initial_value
 8         self._value_lock = threading.Lock()
 9 
10 
11     def incr(self,delta = 1):
12         with self._value_lock:
13             self._value += delta
14 
15     def decr(self, delta=1):
16         with self._value_lock:
17             self._value -= delta

Lock对象和with语句块一起使用可以保证互斥执行,这样每次就只有一个线程可以执行with语句包含的代码块。with语句会在这个代码快执行前自动获取锁,在执行结束后自动释放所。

线程的调度本质上是不确定的,因此,在多线程程序中错误的使用锁机制可能会导致随机数据
损坏或者其他异常错误,我们称之为竞争条件

你可能看到有些“老python程序员”
还是通过_value_lock.acquire() 和_value_lock.release(),明显看来
还是with更加方便,不容易出错,毕竟你无法保证那次就忘记释放锁了

为了避免死锁,使用锁机制的程序应该设定每个线程一次只能获取一个锁

threading库中还提供了其他的同步原语:RLock,Semaphore对象。但是这两个使用场景相对来说比较特殊
RLock(可重入锁)可以被同一个线程多次获取,主要用来实现基于检测对象模式的锁定和同步。在使用这种锁的时候,当锁被持有时,只有一个线程可以使用完整的函数或者类中的方法,例子如下:

 1 import threading
 2 
 3 
 4 class SharedCounter:
 5 
 6     _lock = threading.RLock()
 7 
 8     def __init__(self,initial_value=0):
 9         self._value = initial_value
10 
11     def incr(self,delta=1):
12 
13         with SharedCounter._lock:
14             self._value += delta
15 
16     def decr(self,delta=1):
17 
18         with SharedCounter._lock:
19             self.incr(-delta)

这个例子中的锁是一个类变量,也就是所有实例共享的类级锁,这样就保证了一次只有一个线程可以调用这个类的方法。与标准锁不同的是已经持有这个锁的方法再调用同样适用这个锁的方法时,无需再次获取锁,例如上面例子中的decr方法。
这种方法的特点是:无论这个类有多少实例都使用一个锁。因此在需要使用大量使用计数器的情况下内存效率更高。
缺点:在程序中使用大量线程并频繁更新计数器时会有竞争用锁的问题。

信号量对象是一个建立在共享计数器基础上的同步原语,如果计数器不为0,with语句讲计数器减1,
线程被允许执行。with语句执行结束后,计数器加1。如果计数器为0,线程将被阻塞,直到其他线程结束并将计数器加1。但是信号量不推荐使用,增加了复杂性,影响程序性能。
所以信号量更适用于哪些需要在线程之间引入信号或者限制的程序。例如限制一段代码的并发量

 1 from threading import Semaphore
 2 import requests
 3 
 4 
 5 _fetch_url_sema = Semaphore(5)
 6 
 7 
 8 def fetch_url(url):
 9     with _fetch_url_sema:
10         return requests.get(url)

关于防止死锁的加锁机制

在多线程程序中,死锁问题很大一部分是由于多线程同时获取多个锁造成的。
举个例子:一个线程获取一个第一个锁,在获取第二个锁的时候发生阻塞,那么这个线程就可能阻塞其他线程执行,从而导致整个程序假死。

一种解决方法:为程序中每一个锁分配一个唯一的id,然后只允许按照升序规则来使用多个锁。

 1 import threading
 2 from contextlib import contextmanager
 3 
 4 # 存储已经请求锁的信息
 5 _local = threading.local()
 6 
 7 
 8 @contextmanager
 9 def acquire(*locks):
10     # 把锁通过id进行排序
11     locks = sorted(locks, key=lambda x: id(x))
12 
13     acquired = getattr(_local, 'acquired', [])
14 
15     if acquired and max(id(lock) for lock in acquired) >= id(locks[0]):
16         raise RuntimeError("Lock order Violation")
17     acquired.extend(locks)
18     _local.acquired = acquired
19 
20     try:
21         for lock in locks:
22             lock.acquire()
23         yield
24     finally:
25         for lock in reversed(locks):
26             lock.release()
27         del acquired[-len(locks):]
28 
29 
30 x_lock = threading.Lock()
31 y_lock = threading.Lock()
32 
33 
34 def thread_1():
35     while True:
36         with acquire(x_lock,y_lock):
37             print("Thread-1")
38 
39 
40 def thread_2():
41     while True:
42         with acquire(y_lock,x_lock):
43             print("Thread-2")
44 
45 
46 t1 = threading.Thread(target=thread_1)
47 t1.daemon = True
48 t1.start()
49 
50 t2 = threading.Thread(target=thread_2)
51 t2.daemon = True
52 t2.start()

通过排序,不管以什么样的顺序来请求锁,这些锁都会按照固定的顺序被获取。
这里也用了thread.local()来保存请求锁的信息
同样的这个东西也可以用来保存线程的信息,而这个线程对其他的线程是不可见的

所有的努力都值得期许,每一份梦想都应该灌溉!
目录
相关文章
|
16天前
|
并行计算 安全 Java
Python GIL(全局解释器锁)机制对多线程性能影响的深度分析
在Python开发中,GIL(全局解释器锁)一直备受关注。本文基于CPython解释器,探讨GIL的技术本质及其对程序性能的影响。GIL确保同一时刻只有一个线程执行代码,以保护内存管理的安全性,但也限制了多线程并行计算的效率。文章分析了GIL的必要性、局限性,并介绍了多进程、异步编程等替代方案。尽管Python 3.13计划移除GIL,但该特性至少要到2028年才会默认禁用,因此理解GIL仍至关重要。
83 16
Python GIL(全局解释器锁)机制对多线程性能影响的深度分析
|
3天前
|
Python
python3多线程中使用线程睡眠
本文详细介绍了Python3多线程编程中使用线程睡眠的基本方法和应用场景。通过 `time.sleep()`函数,可以使线程暂停执行一段指定的时间,从而控制线程的执行节奏。通过实际示例演示了如何在多线程中使用线程睡眠来实现计数器和下载器功能。希望本文能帮助您更好地理解和应用Python多线程编程,提高程序的并发能力和执行效率。
32 20
|
1月前
|
安全 Java 程序员
面试直击:并发编程三要素+线程安全全攻略!
并发编程三要素为原子性、可见性和有序性,确保多线程操作的一致性和安全性。Java 中通过 `synchronized`、`Lock`、`volatile`、原子类和线程安全集合等机制保障线程安全。掌握这些概念和工具,能有效解决并发问题,编写高效稳定的多线程程序。
66 11
|
1月前
|
数据采集 消息中间件 Java
python并发编程:什么是并发编程?python对并发编程有哪些支持?
并发编程能够显著提升程序的效率和响应速度。例如,网络爬虫通过并发下载将耗时从1小时缩短至20分钟;APP页面加载时间从3秒优化到200毫秒。Python支持多线程、多进程、异步I/O和协程等并发编程方式,适用于不同场景。线程通信方式包括共享变量、消息传递和同步机制,如Lock、Queue等。Python的并发编程特性使其在处理大规模数据和高并发访问时表现出色,成为许多领域的首选语言。
|
3月前
|
缓存 Java 开发者
Java多线程并发编程:同步机制与实践应用
本文深入探讨Java多线程中的同步机制,分析了多线程并发带来的数据不一致等问题,详细介绍了`synchronized`关键字、`ReentrantLock`显式锁及`ReentrantReadWriteLock`读写锁的应用,结合代码示例展示了如何有效解决竞态条件,提升程序性能与稳定性。
353 6
|
3月前
|
数据采集 存储 数据处理
Python中的多线程编程及其在数据处理中的应用
本文深入探讨了Python中多线程编程的概念、原理和实现方法,并详细介绍了其在数据处理领域的应用。通过对比单线程与多线程的性能差异,展示了多线程编程在提升程序运行效率方面的显著优势。文章还提供了实际案例,帮助读者更好地理解和掌握多线程编程技术。
|
3月前
|
并行计算 数据处理 调度
Python中的并发编程:探索多线程与多进程的奥秘####
本文深入探讨了Python中并发编程的两种主要方式——多线程与多进程,通过对比分析它们的工作原理、适用场景及性能差异,揭示了在不同应用需求下如何合理选择并发模型。文章首先简述了并发编程的基本概念,随后详细阐述了Python中多线程与多进程的实现机制,包括GIL(全局解释器锁)对多线程的影响以及多进程的独立内存空间特性。最后,通过实例演示了如何在Python项目中有效利用多线程和多进程提升程序性能。 ####
|
3月前
|
API 数据处理 Python
探秘Python并发新世界:asyncio库,让你的代码并发更优雅!
在Python编程中,随着网络应用和数据处理需求的增长,并发编程变得愈发重要。asyncio库作为Python 3.4及以上版本的标准库,以其简洁的API和强大的异步编程能力,成为提升性能和优化资源利用的关键工具。本文介绍了asyncio的基本概念、异步函数的定义与使用、并发控制和资源管理等核心功能,通过具体示例展示了如何高效地编写并发代码。
64 2
|
3月前
|
Java Unix 调度
python多线程!
本文介绍了线程的基本概念、多线程技术、线程的创建与管理、线程间的通信与同步机制,以及线程池和队列模块的使用。文章详细讲解了如何使用 `_thread` 和 `threading` 模块创建和管理线程,介绍了线程锁 `Lock` 的作用和使用方法,解决了多线程环境下的数据共享问题。此外,还介绍了 `Timer` 定时器和 `ThreadPoolExecutor` 线程池的使用,最后通过一个具体的案例展示了如何使用多线程爬取电影票房数据。文章还对比了进程和线程的优缺点,并讨论了计算密集型和IO密集型任务的适用场景。
147 4
|
3月前
|
设计模式 安全 Java
Java 多线程并发编程
Java多线程并发编程是指在Java程序中使用多个线程同时执行,以提高程序的运行效率和响应速度。通过合理管理和调度线程,可以充分利用多核处理器资源,实现高效的任务处理。本内容将介绍Java多线程的基础概念、实现方式及常见问题解决方法。
193 0

热门文章

最新文章