实战
什么是GIL ( global interpreter lock ): 全局解释锁
Python中的一个线程对应于c语言当中的一个线程;因为python语言在前期为了简单,在进行编程的时候,会在解释器上面加一个非常大的锁;它允许我们一次只有一个线程运行在我们的CPU上。
学习多线程,希望大家能够了解2点:
1、python在多线程中为什么有人会觉得它慢? ---> 字节码 - 使得同一时刻只能有一个线程在一个CPU上面执行字节码
2、是不是多线程真的就很慢?--->(这个要分情况,后面会进行验证)
- dis库是python(默认的CPython)自带的一个库,可以用来分析字节码
import dis
# 反编译 将函数变成字节码的流程
def tony(t):
t = t+1
return t
print(dis.dis(tony))
gil在某些情况下是可以被释放掉的--> 这个是python内部的策略问题;所以GIL锁会释放。
- 第一种情况释放:gil会根据执行的字节码行数以及时间片进行释放gil
- 第二种情况释放:程序会在遇到io操作的时候 主动释放
什么是时间片? 时间片的概念是什么?
时间片即CPU分配给各个程序的时间,每个线程被分配一个时间段,称作它的时间片,即该进程允许运行的时间,使各个程序从表面上看是同时进行的。如果在时间片结束时进程还在运行,则CPU将被剥夺并分配给另一个进程。如果进程在时间片结束前阻塞或结束,则CPU当即进行切换。而不会造成CPU资源浪费。
- 在宏观上:我们可以同时打开多个应用程序,每个程序并行不悖,同时运行。
- 在微观上:由于只有一个CPU,一次只能处理程序要求的一部分,如何处理公平,一种方法就是引入时间片,每个程序轮流执行。--->{时间片轮回算法}
a = 0
def time():
'''
1. thread_001
2. io操作 没想象中那么严,会主动释放的
3. thread_002
:return:
'''
global a # LEGB
for item in range(1000000):
a += 1
def test():
global a
for item in range(1000000):
a -= 1
import threading
thread_1 = threading.Thread(target=time)
thread_2 = threading.Thread(target=test)
thread_1.start()
thread_2.start()
thread_1.join()
thread_2.join()
print(a)
threading多线程-守护线程
class GetDataHtml(threading.Thread):
def __init__(self,name):
super().__init__(name=name)
def run(self):
print('开始获取数据html的时间')
time.sleep(2)
print('获取数据html结束的时间')
class GetDataUrl(threading.Thread):
def __init__(self,name):
super().__init__(name=name)
def run(self):
print('开始获取数据url的时间')
time.sleep(2)
print('获取数据url结束的时间')
if __name__ == '__main__':
thread_one=GetDataHtml('tony')
thread_two=GetDataUrl('老师')
start_time = time.time()
thread_one.start()
thread_two.start()
# print("中间运行时间:{}".format(time.time()-start_time))
# 阻塞
thread_one.join()
thread_two.join()
print("中间运行时间:{}".format(time.time()-start_time))
线程间的通讯-共享变量
queue
线程间的通讯
共享变量
import time
import threading
data_list = []
def get_data_html():
global data_list
for url in data_list:
print('开始获取数据html的时间')
time.sleep(2)
print('获取数据html结束的时间')
def get_data_url():
global data_list
print('开始获取数据url的时间')
time.sleep(3)
for item in range(30):
data_list.append('http://logic.org/{}id'.format(item))
print('获取数据url结束的时间')
if __name__ == '__main__':
thread_url = threading.Thread(target=get_data_url)
thread_1 = threading.Thread(target=get_data_html)
thread_2 = threading.Thread(target=get_data_html)
thread_3 = threading.Thread(target=get_data_html)
thread_4 = threading.Thread(target=get_data_html)
thread_5 = threading.Thread(target=get_data_html)
start_time = time.time()
thread_url.start()
thread_1.start()
thread_2.start()
thread_3.start()
thread_4.start()
thread_5.start()
import time
import threading
# from '第七讲-Tony老师' import test_tool
data_list = []
def get_data_html(data_list):
# global data_list
while True:
if len(data_list):
url=data_list.pop()
# for url in data_list:
print('开始获取数据html的时间')
time.sleep(2)
print('获取数据html结束的时间')
def get_data_url(data_list):
# global data_list
while True:
print('开始获取数据url的时间')
time.sleep(3)
for item in range(30):
data_list.append('http://logic.org/{id}'.format(id=item))
print('获取数据url结束的时间')
if __name__ == '__main__':
thread_url = threading.Thread(target=get_data_url,args=(data_list,))
for item in range(10):
thread_html = threading.Thread(target=get_data_html,args=(data_list,))
thread_html.start()
start_time = time.time()
thread_url.start()
print("中间运行时间:{}".format(time.time()-start_time))
Queue
1.共享变量
2.queue队列 - 它本身是安全的 - 引用了 deque 双端队列
import time
import threading
from queue import Queue
def get_data_html(queue):
# global data_list
while True:
url = queue.get()
print('开始获取数据html的时间')
time.sleep(2)
print('获取数据html结束的时间')
def get_data_url(queue):
# global data_list
while True:
print('开始获取数据url的时间')
time.sleep(3)
for item in range(30):
# 添加
queue.put('http://logic.org/{id}'.format(id=item))
print('获取数据url结束的时间')
if __name__ == '__main__':
# 设置队列
data_url_queue = Queue(maxsize=1000)
thread_url = threading.Thread(target=get_data_url,args=(data_url_queue,))
for item in range(10):
thread_html = threading.Thread(target=get_data_html,args=(data_url_queue,))
thread_html.start()
start_time = time.time()
# 成对出现的
data_url_queue.join()
data_url_queue.task_done()
# thread_url.start()
print("中间运行时间:{}".format(time.time()-start_time))
Lock,Rlock
1.线程间的通讯问题
2.线程同步问题 - 为了解决结果不一致
'''
'''
import dis
def time(a):
a += 1
def test(a):
a -= 1
'''
0 LOAD_FAST 0 (a)
2 LOAD_CONST 1 (1)
4 INPLACE_ADD + 1
6 STORE_FAST 0 (a)
'''
print(dis.dis(time))
print(dis.dis(test))
Module_thread_lock.py
'''
A(a,b)
acquire(a)
acquire(b)
资源竞争 - 互相等待
B(a,b)
acquire(b)
acquire(a)
'''
from threading import Lock
a = 0
lock = Lock()
def time():
global a
global lock
for item in range(1000000):
# 上锁
lock.acquire()
a += 1
# 释放锁
lock.release()
def test():
global a
global lock
for item in range(1000000):
# 上锁
lock.acquire()
a -= 1
# 释放锁
lock.release()
import threading
thread_1 = threading.Thread(target=time)
thread_2 = threading.Thread(target=test)
thread_1.start()
thread_2.start()
thread_1.join()
thread_1.join()
print(a)
结论:
1.用锁会影响性能
2.锁会引起死锁 资源竞争 - 互相等待
RLock - 可重入锁
'''
# '''
# import dis
#
#
# def time(a):
# a += 1
#
#
# def test(a):
# a -= 1
# '''
# 0 LOAD_FAST 0 (a)
# 2 LOAD_CONST 1 (1)
# 4 INPLACE_ADD + 1
# 6 STORE_FAST 0 (a)
# '''
#
# print(dis.dis(time))
# print(dis.dis(test))
'''
A(a,b)
acquire(a)
acquire(b)
资源竞争 - 互相等待
B(a,b)
acquire(b)
acquire(a)
'''
from threading import Lock, RLock
a = 0
lock = RLock()
def time():
global a
global lock
for item in range(1000000):
# 上锁
lock.acquire()
# do_time(lock)
lock.acquire()
a += 1
# 释放锁
lock.release()
lock.release()
def do_time(lock):
# 上锁
lock.acquire()
# 0---------------------
# 释放锁
lock.release()
def test():
global a
global lock
for item in range(1000000):
# 上锁
lock.acquire()
a -= 1
# 释放锁
lock.release()
import threading
thread_1 = threading.Thread(target=time)
thread_2 = threading.Thread(target=test)
thread_1.start()
thread_2.start()
thread_1.join()
thread_1.join()
print(a)
Condition(条件变量):主要用于复杂线程间同步的一个锁。 ---> 通过condition完成双方协同聊天的功能
wait 和 notify 是condition的精髓。
在调用with cond 之后才能调用 wait 或者 notify方法。 PS: 必须要在with语句下面,才能够成功;否则会报错。
在condition中 有2层锁, 一把底层锁会在线程调用wait方法的时候释放,上面的锁会在每次调用wait的时候分配一把并放入cond中的等待队列中;直到notify方法的唤醒。
import threading
from threading import Condition
'''
张三 : 你好,很高兴认识你 A
马六 : 我也是 B
张三 : 你是什么样的人
马六 : 我是憨憨
张三 : 。。。
马六 : 。。。。。。
'''
class ZhangSan(threading.Thread):
def __init__(self, lock):
super().__init__(name='张三')
self.lock = lock
def run(self):
lock.acquire()
print("{} : 你好,很高兴认识你".format(self.name))
lock.release()
# lock.acquire()
# print("{} : 你是什么样的人".format(self.name))
# lock.release()
class MaLiu(threading.Thread):
def __init__(self, lock):
self.lock = lock
super().__init__(name='马六')
def run(self):
lock.acquire()
print("{} : 我也是".format(self.name))
lock.release()
# lock.acquire()
# print("{} : 我是憨憨".format(self.name))
# lock.release()
if __name__ == '__main__':
lock = threading.Lock()
zhang_san = ZhangSan(lock)
ma_liu = MaLiu(lock)
zhang_san.start()
ma_liu.start()
Condition
import threading
from threading import Condition
'''
张三 : 你好,很高兴认识你 A
马六 : 我也是 B
张三 : 你是什么样的人
马六 : 我是憨憨
张三 : 。。。
马六 : 。。。。。。
'''
class ZhangSan(threading.Thread):
def __init__(self, cond):
super().__init__(name='张三')
self.cond = cond
def run(self):
with self.cond:
# self.cond.acquire()
print("{} : 你好,很高兴认识你".format(self.name))
self.cond.notify()
self.cond.wait()
print("{} : 你是什么样的人".format(self.name))
self.cond.notify()
self.cond.wait()
# self.cond.release()
class MaLiu(threading.Thread):
def __init__(self, cond):
self.cond = cond
super().__init__(name='马六')
def run(self):
with self.cond:
# self.cond.acquire()
self.cond.wait()
print("{} : 我也是".format(self.name))
self.cond.notify()
self.cond.wait()
print("{} : 我是憨憨".format(self.name))
self.cond.notify()
# self.cond.release()
if __name__ == '__main__':
cond = threading.Condition()
zhang_san = ZhangSan(cond)
ma_liu = MaLiu(cond)
# 程序的执行顺序
ma_liu.start()
zhang_san.start()
Semaphore (用于控制进入数量的锁)
我们的线程对于操作系统来说,如果线程越多,我们操作系统的切换就会越慢;所以在某些时候我们想要控制线程并发数量的时候,Semaphore的意义是非常重要的;
'''
读和写
Semaphroe():用于控制进入的数量的锁, 信号量
'''
import threading
# threading.Semaphore()
import time
class GetUrl(threading.Thread):
def __init__(self, sem):
super().__init__()
self.sem = sem
def run(self):
# list.append()
for item in range(20):
# 上锁
self.sem.acquire()
html_thread = HtmlSpider('http://news.baidu.com/{}'.format(item), self.sem)
html_thread.start()
class HtmlSpider(threading.Thread):
def __init__(self, url, sem):
super().__init__()
self.url = url
self.sem = sem
def run(self):
time.sleep(2)
print("获取了一页html详情信息!")
# 释放锁
self.sem.release()
if __name__ == '__main__':
# 只允许并发3个
sem = threading.Semaphore(3)
url_threads = GetUrl(sem)
url_threads.start()