线程
线程的特点
多个线程共享同一个进程的地址空间中的资源,是对一台计算机上多个进程的模拟,有时也称线程为轻量级的进程。而对一台计算机上多个进程,则共享物理内存、磁盘、打印机等其他物理资源。
多线程的运行也多进程的运行类似,是cpu在多个线程之间的快速切换。
线程和python全局解释器锁GIL
Python代码的执行由Python虚拟机(也叫解释器主循环)来控制。Python在设计之初就考虑到要在主循环中,同时只有一个线程在执行。虽然 Python 解释器中可以“运行”多个线程,但在任意时刻只有一个线程在解释器中运行。
对Python虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同一时刻只有一个线程在运行。
在多线程环境中,Python 虚拟机按以下方式执行:
a、设置 GIL;
b、切换到一个线程去运行;
c、运行指定数量的字节码指令或者线程主动让出控制(可以调用 time.sleep(0));
d、把线程设置为睡眠状态;
e、解锁 GIL;
d、再次重复以上所有步骤。
在调用外部代码(如 C/C++扩展函数)的时候,GIL将会被锁定,直到这个函数结束为止(由于在这期间没有Python的字节码被运行,所以不会做线程切换)编写扩展的程序员可以主动解锁GIL。
全局解释器GIL锁就是同一时间只能有一个线程去访问cpu,但是这并不是说python的效率就很低,因为只有在涉及到大量的计算需要cpu来计算数据的时候,这时候才涉及到线程去访问cpu,这种情况下可以说python的效率要比别的语言低。所以在这个时候我们一般采用多进程
但是在别的场景I/O很多的情况下,不涉及很多的cpu计算,这种情况下是可以多线程运行的。
全局解释器GIL锁只是在CPYTHON解释器中才会有,在别的解释器中是不存在的。
线程的创建threading模块
守护线程问题
from threading import Thread
import time
def func1():
while True:
time.sleep(1)
print('hahaha') def func2(): time.sleep(5) print('子线程2') t1 = Thread(target=func1) t1.setDaemon(True) t1.start() Thread(target=func2).start() print('主线程')
####
主线程
hahaha
hahaha
hahaha
hahaha
子线程2
说明:守护线程会再非守护线程全部运行结束后才结束。
原因:因为主线程的结束意味着主进程结束,而子线程还没有结束,也就是说这时候主线程也还不能结束,守护线程也不能结束,当子线程结束后,守护线程结束,主线程结束,主进程也就结束了,然后就回收
解释:
1 主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕 后回收子进程的资源(否则会产生僵尸进程),才会结束, 2 主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束, 进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。
同步锁
from threading import Thread
import os,time
def work():
global n
temp=n
time.sleep(0.1) n=temp-1 if __name__ == '__main__': n=100 l=[] for i in range(100): p=Thread(target=work) l.append(p) p.start() for p in l: p.join() print(n) #结果可能为99
#这个结果为什么是99:因为每个线程在work函数中每次通过cpu计算出来的结果还要重新对n赋值,在写入内存中
同步锁
from threading import Thread,Lock
import os,time
def work():
global n
lock.acquire()
temp=n
time.sleep(0.1) n=temp-1 lock.release() if __name__ == '__main__': lock=Lock() n=100 l=[] for i in range(100): p=Thread(target=work) l.append(p) p.start() for p in l: p.join() print(n) #结果肯定为0,由原来的并发执行变成串行,牺牲了执行效率保证了数据安全
死锁(Lock)与递归锁(RLock)
死锁也叫互斥锁,就是在统一线程中,被同一个锁中的多个acquire 阻塞住。
from threading import Lock as Lock
import time
mutexA=Lock()
mutexA.acquire()
mutexA.acquire()
print(123)
mutexA.release()
mutexA.release()
###阻塞住,打印不出123
解决办法:用递归锁在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。
import time
from threading import Thread,RLock
fork_lock = noodle_lock = RLock()
def eat1(name):
noodle_lock.acquire()
print('%s 抢到了面条'%name)
fork_lock.acquire()
print('%s 抢到了叉子'%name) print('%s 吃面'%name) fork_lock.release() noodle_lock.release() def eat2(name): fork_lock.acquire() print('%s 抢到了叉子' % name) time.sleep(1) noodle_lock.acquire() print('%s 抢到了面条' % name) print('%s 吃面' % name) noodle_lock.release() fork_lock.release() for name in ['哪吒','egon','yuan']: t1 = Thread(target=eat1,args=(name,)) t2 = Thread(target=eat2,args=(name,)) t1.start() t2.start()
信号量
线程的信号量和进程的信号量一样,内部都有一个计数器,每当acquire 内部计数器就减1 当计数器为0的时候就阻塞,当release一次就加1 这时候就可以再执行一个进程。
from threading import Thread,Semaphore
import threading
import time
# def func():
# if sm.acquire():
# print (threading.currentThread().getName() + ' get semaphore')
# time.sleep(2)
# sm.release()
def func():
sm.acquire()
print('%s get sm' %threading.current_thread().getName())
time.sleep(3) sm.release() if __name__ == '__main__': sm=Semaphore(5) for i in range(23): t=Thread(target=func) t.start()
最大线程数为5
信号量和池的区别就是:信号量会开启大量的线程,但是每次执行线程数是一定的。
线程池 因为线程池的数量是一定的,所以开启的线程数和执行的进程数永远是固定的
线程队列
queue队列 :使用import queue,用法与进程Queue一样
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
-
class
queue.
Queue
(maxsize=0) #先进先出
import queue q=queue.Queue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 结果(先进先出): first second third '''
class queue.
LifoQueue
(maxsize=0) #last in fisrt out 后进先出
import queue q=queue.LifoQueue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 结果(后进先出): third second first '''
class queue.
PriorityQueue
(maxsize=0) #存储数据时可设置优先级的队列
import queue q=queue.PriorityQueue() #put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高 q.put((20,'a')) q.put((10,'b')) q.put((30,'c')) print(q.get()) print(q.get()) print(q.get()) ''' 结果(数字越小优先级越高,优先级高的优先出队): (10, 'b') (20, 'a') (30, 'c') '''
线程池
import time
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
def func(i):
print(i*'*')
time.sleep(1)
return i**2
def callb(arg): print(arg.result()*'-') if __name__ == '__main__': # thread_pool = ThreadPoolExecutor(5) thread_pool = ThreadPoolExecutor(5) # ret_lst = [] for i in range(1,11): thread_pool.submit(func,i).add_done_callback(callb) # 相当于apply_async # ret = thread_pool.submit(func,i).add_done_callback(callable) # 相当于apply_async # ret_lst.append(ret) thread_pool.shutdown() # close+join # for ret in ret_lst: # print(ret.result()) print('wahaha') # 回调函数 # 进程池 # 线程池 # 当内存不需要共享,且高计算的时候 用进程 # 当内存需要共享,且高IO的时候 用线程 # 当并发很大的时候 # 多进程 : 多个任务 —— 进程池 :cpu个数、cpu个数+1 # 多线程 :多个任务 —— 线程池 :cpu个数*5 # 4C : 4个、5个进程 —— 20条线程/进程 : 80-100个任务