开发者学堂课程【Python入门 2020年版:线程间通信】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/639/detail/10469
线程间通信
内容介绍
一、线程间通信
二、举例说明
一、线程间通信
1.简单介绍
线程之间有时需要通信,操作系统提供了很多机制来实现进程间的通信,其中我们使用最多的是队列 Queue。
2.Queue 的原理
Queue 是一个先进先出 (First in First Out)的队列,主进程中创建一个 Queue 对象,并作为参数传入子进程,两者之间通过 put( )放入数据,通过 get( )取出数据,执行了 get( )涵数之后队列中的数据会被同时删除,可以使用multiprocessing 模块的 Queue 实现多进程之间的数据传递。
import threading
import time
from gqutue import Queue
def producer (queue):
for i in range(100) :
print('{}
存入了
{}" .format(threading.current_thread().name,i))
queue.put(i)
time .sleep(0.1)
Return
def consumer(queue):
二、举例说明
最常见的例子:
生产和消费者,去面包店买面包,一个是生产面包的(生产者),一个是买面包的(消费者),他们要同时操作一个数据。有两个线程一个生产者,一个消费者,他们都来访问面包,生产者增加面包,消费者来取面包,面包减少。
两个线程如何通信,用全局变量会出现线程安全问题。Queue 结构可以在不同的线程间传递数据。
有一个问题消费者一直在消费,面包卖完了,消费者买不到,需要等待。面包生产一个就能买到一个,生产快,面包就要堆积,生产慢,消费者就要等待。
1.d
ef produce():
for i in range (10):
print(‘生产了面包’)
def consumer():
for i in range(10):
print(‘买到了面包’)
p1= threading.Thread(target=produce
,name=’p1’)
(注意:produce 后面不能加括号,加括号是函数调用)
c1=threading.Thread(target=consumer,name=’c1’)
p1.start()
c1.start()
我们的目标是让生产者线程里生产的面包能够让消费者线程拿到。
2.import threading
,queue
#导入queue
def produce():
for i in range (10):
time.sleep(1)
print(
‘生产+++++面包b{}’.format(i))
q.put(‘b{}’.format(i))
def consumer():
for i in range(100):
time.sleep(0.3)
#q.get()
方法是一个阻塞的方法
print(‘买到-----面包{}’.format(q.get)))
q=queue.Queue
#创建全局变量 q
p1= threading.Thread(target=produce
,name=’p1’)
c1=threading.Thread(target=consumer,name=’c1’)
p1.start()
c1.start()
生产一个面包就放一个进队列 b0、b1、b2...b10,为了看得更明显,我们加上 time.sleep。
运行时发现,有一段时间什么都没打印,因为 q.get 的方法是一个阻塞的方法,一上来的时候都要歇,1秒要停,0.3秒要停,肯定先走0.3秒的,但是对列里没有东西可以拿,所以它在等待,会有1秒什么都没打印。
3.对列结构
对列结构很有意思,生产 b0,cpu 此时切换到消费者直接拿走了,紧接着生产了 b1、b2,先拿 b1,又生产 b3、b4,消费者又拿了 b2。这叫“先进先出”。
Stack 栈结构是“后进先出,先进后出”。
4.复杂场景
import threading
,queue
#导入queue
def produce():
for i in range (10):
time.sleep(1)
print(
‘生产+++++面包{}
{}
’.format(threading.current_thread().name,i))
q.put(‘{} {}’.format((threading.current_thread().name,i))
def consumer():
for i in range(100):
time.sleep(0.3)
#q.get()方法是一个阻塞的方法
print(
‘{}
买到-----面包
{}’.format(threading.current_thread().name,q.get)))
q=queue.Queue
#创建全局变量 q
#一条生产线
pa= threading.Thread(target=produce
,name=’p1’)
#一条消费线
ca=threading.Thread(target=consumer,name=’c1’)
pa.start()
ca.start()
运行打印显示
生产+++++面包 pa 0
生产+++++面包 pa 1
ca 买到-----面包 pa 0
生产+++++面包 pa 2
生产+++++面包 pa 3
生产+++++面包 p1 4
ca 买到-----面包 pa 2
生产+++++面包 p1 5
生产+++++面包 p1 6
ca 买到-----面包 pa 3
......
如果多两个生产线,就会严重地供过于求。
import threading
,queue
#导入queue
def produce():
for i in range (10):
time.sleep(1)
print(
‘生产+++++面包{}
{}
’.format(threading.current_thread().name,i))
q.put(‘{} {}’.format((threading.current_thread().name,i))
def consumer():
for i in range(100):
time.sleep(0.3)
#q.get()方法是一个阻塞的方法
print(
‘{}买到-----面包
{}’.format(threading.current_thread().name,q.get)))
q=queue.Queue
#创建全局变量 q
#一条生产线
pa= threading.Thread(target=produce
,name=’p1’)
pb= threading.Thread(target=produce
,name=’p1’)
pc= threading.Thread(target=produce,name=’p1’)
#一条消费线
ca=threading.Thread(target=consumer,name=’c1’)
pa.start()
pb.start()
pc.start()
ca.start()
运行打印显示
生产+++++面包 pb 0
生产+++++面包 pa 0
生产+++++面包 pc 0
生产+++++面包 pb 1
ca 买到-----面包 pb1
生产+++++面包 pa 1
生产+++++面包 pc 1
生产+++++面包 pb 2
生产+++++面包 pa 2
生产+++++面包 pc 2
ca 买到-----面包 pa 0
......
最后的面包没有买完,因为我每个线程生产了10个,只开了一个线程买10次,所以卖不完,直接消费者写个死循环,就可以买完。死循环不用停止,就是等待任务。
def consumer():
while True:
time.sleep(0.3)
#q.get()方法是一个阻塞的方法
print(
‘{}
买到-----面包
{}’.format(threading.current_thread().name,q.get)))
再加两条生产线
ca=threading.Thread(target=consumer,name=
’c1’)
cb=threading.Thread(target=consumer,name=
’c1’)
cc=threading.Thread(target=consumer,name=’c1’)
手机怎么知道你来电话呢,因为后台运行一个死循环一直找对列里有没有任务,有个电话来了,就把消息发到q对列来,通知你电话来了,来一个消息就处理一个。这里不要break,不要停掉。手机里有很多死循环。
例如服务器里的应用,当你注册完了以后,留一个邮箱,会有个点击按钮发送邮件,发邮件这属于是比较耗时的网络操作,如果在主线程里发,发送等待的5-10s 基本是卡死的状态,可以把任务交给子线程,后面会讲。
注意:不能在主线程里死循环。