# 进程只是资源单位 # 进程内部的代码才是线程 # 区别: # 1.不同进程之间数据是不共享的 # 1.同一进程下不同线程之间的数据是共享的 # 2.进程开销远大于线程 # 3.pid # from threading import Thread # import time # # def task(name): # time.sleep(2) # print("%s is running"%(name)) # # if __name__ == "__main__": # t1 = Thread(target=task,args=("线程一",)) # t1.start() # t1.join() # print("主线程") # 线程的属性 # from threading import Thread # import os # from multiprocessing import current_process #跟os.getpid()一样 # from threading import currentThread,active_count # class MyThread(Thread): # def __init__(self,name): # super().__init__() # self.name = name # # def run(self): # print("%s is running"%(self.name),current_process().pid) # # if __name__ == "__main__": # t1 = MyThread("线程一") # t1.start() # t1.setName("儿子线程一") #设置线程名字 # t1.is_alive() # t1.join() # t2 = MyThread("线程二") # t2.start() # currentThread().setName("baba线程") # 主函数内线程改名字 # # print("主线程",os.getpid()) # print(currentThread().getName()) # 主函数内线程获取名字 # print(active_count()) # 获取活跃的线程数量 #守护线程 # 守护线程,会跟着主线程一起死,主线程会等着非守护线程死 # 非守护线程结束,主线程结束,守护线程立刻死亡 # from threading import Thread,currentThread # import time,os # # def task(name): # print("%s is running"%name) # time.sleep(3) # print("%s is done"%name) # # if __name__ == "__main__": # t1 = Thread(target=task,args=("子线程一",)) # t1.start() # t2 = Thread(target=task,args=("守护线程",)) # time.sleep(3) # # t2.daemon = True # 这样方法一开启线程 # t2.setDaemon(True) # 方法二开启守护线程 # t2.start() # currentThread().setName("我是线程老大") # print("%s is running"%(currentThread().getName())) # # 互斥锁 # from threading import Thread,Lock # import time # # n = 100 # def task(mutex): # # global n # mutex.acquire() # 加锁 # temp = n # time.sleep(0.1) # n = temp -1 # mutex.release() # 释放锁 # if __name__ == "__main__": # t_list = [] # mutex = Lock() # for i in range(100): # t = Thread(target=task,args=(mutex,)) # t.start() # t_list.append(t) # for i in t_list: # i.join() # print(n) # GIL全局解释器锁:特殊的互斥锁 # python执行原理 # 第一步:开启进程以后 # 第二步:将python解释器加载到内存,将代码加载到内存 # 第三步:有多个线程,和垃圾回收线程 # 线程想执行,都是把代码传到解释器里去执行 # 为了解决多个线程与垃圾回收线程共享数据 的问题,所以要给解释器上锁。 # GIL主要是解决多线程下共享数据问题 # GIL不限制多进程 # 垃圾回收线程过一段时间自己开启,自己关闭 # 区别: # GIL解决垃圾回收的数据 # 自己写的互斥锁,解决的是代码中共享的数据 # ?感觉多线程用不到多核啊,实现不了并行啊 # 并发:看起来像一起运行 # 并行:多核 --对多进程有利 # # 多核:对多进程有用 # 单核:多进程根本用不到,所以要开多线程 # # 假设:任务是计算密集型:计算 那么开多进程 如:金融计算,数据处理,人工智能 # 任务是IO密集型:sleep()和文件交互 那么我开多线程 如socket,爬虫,web # 死锁:你要知道走的流程就好理解了 # 按照小品说的:林:你给我出示证件,我就帮你打开箱子。 黄:你要不给我打开箱子,我就没法给你出示证件。 林:你要不给我出示证件,我就没法给你打开箱子。 黄:你先给我打开箱子,我后给你出示证件嘛! 林:你先给我出示证件,我后给你打开箱子嘛! # from threading import Thread,Lock # import time # mutexA = Lock() # mutexB = Lock() # # class MyThread(Thread): # def run(self): # self.fun1() # self.fun2() # def fun1(self): # mutexA.acquire() # print("%s拿到了A锁"%(self.name)) # mutexB.acquire() # print("%s拿到了B锁" % (self.name)) # mutexB.release() # mutexA.release() # def fun2(self): # mutexB.acquire() # print("%s拿到了B锁"%(self.name)) # time.sleep(1) # mutexA.acquire() # print("%s拿到了A锁" % (self.name)) # mutexA.release() # mutexB.release() # # if __name__ == "__main__": # for i in range(10): # t = MyThread() # t.start() # """ # Thread-1拿到了A锁 # Thread-1拿到了B锁 # Thread-1拿到了B锁 # Thread-2拿到了A锁 # """ # # 递归锁:解决死锁,RLock() # 递归锁原理:可以连续acquire,他会计数 # 只有计数为0的时候才能被别的线程抢到 # from threading import Thread,RLock # import time # # mutexB=mutexA=RLock() # # class MyThread(Thread): # def run(self): # self.fun1() # self.fun2() # def fun1(self): # mutexA.acquire() # print("%s拿到A锁"%(self.name)) # mutexB.acquire() # print("%s拿到B锁" % (self.name)) # mutexB.release() # mutexA.release() # def fun2(self): # time.sleep(3) # mutexB.acquire() # print("%s拿到B锁"%(self.name)) # time.sleep(1) # mutexA.acquire() # print("%s拿到A锁" % (self.name)) # mutexA.release() # mutexB.release() # # if __name__ == "__main__": # for i in range(5): # t = MyThread() # t.start() # 信号量:比如:一个厕所5个坑,5个坑被占了,其他人就得等,这会就得上锁 # from threading import Thread,Semaphore,currentThread # import time,random # sm = Semaphore(5) # # def task(): # with sm: # print("%s正在上厕所"%(currentThread().getName())) # tm = random.randint(1,6) # time.sleep(tm) # print("%s用时%s秒"%(currentThread().getName(),tm)) # # if __name__ == "__main__": # for i in range(10): # t = Thread(target=task) # t.start() # Event事件: # from threading import Thread,Event,currentThread # import time # # event = Event() # # def student(): # print("学生%s正在上课"%(currentThread().getName())) # event.wait(3) # 我等三秒,我就走 # print("学生%s课间活动" % (currentThread().getName())) # # def teacher(): # print("老师%s正在上课"%(currentThread().getName())) # time.sleep(7) # event.set() # print(event.isSet()) # 判断set是否被设置 # print(event.is_set()) # event.clear() #将他返回原来的状态 # print(event.is_set()) # # if __name__ == "__main__": # s1 = Thread(target=student) # s2 = Thread(target=student) # s3 = Thread(target=student) # t1 = Thread(target=teacher) # # s1.start() # s2.start() # s3.start() # t1.start() # 定时器: # 例子一:简单实用 # from threading import Timer # # def task(name): # print("%s is running"%(name)) # # t = Timer(5,task,args=(" 3ξ",)) # t.start() # 定时器 # 例子二:写验证码 # from threading import Timer,Thread # import string,random # # class Code: # #初始化 # def __init__(self): # self.make_cache() # # #循环调用定时器 # def make_cache(self,interval=5): # self.code = self.make_code() # self.t = Timer(interval,self.make_cache) # self.t.start() # # #创建默认4位的验证码 # def make_code(self,n=4): # res = "" # for i in range(n): # res+=random.choice(list(string.ascii_letters+string.digits)) # print(res) # return res # # #循环等待验证 # def check(self): # while True: # s = input("请输入验证码") # if not s.strip() : continue # if s.strip().upper() == self.code.upper(): # print("验证成功") # self.t.cancel() # 关闭定时器 # return # else: # print("验证码输入错误") # # c = Code() # c.check() # 线程queue:解决共享数据时锁的问题 # 进程不共享数据,线程共享数据。queue解决共享数据和锁的问题 # 基本用法 # from threading import Thread # import queue # # q=queue.Queue(3) #队列,先进先出 # q.put("dsada") # q.put(99) # q.put([1,2,3]) # # q.put("haha",block=False,timeout=3) #True堵塞,等待,加上timeout 就是最多等3秒,就不等了,如果是False不堵塞,后面的time加不加无所谓 # q.get() # q.get() # q.get() # # q.get(block=True,timeout=3) #跟那个一样 # 栈:就是队列的变形 ,后进先出也在queue模块内 # import queue # q = queue.LifoQueue() # #优先级队列:谁的优先级高谁先出来 # import queue # q = queue.PriorityQueue() # q.put([30,"数据"]) #数字越小,优先级越高 # #简单队列 # q = queue.SimpleQueue() # 线程池,进程池,是为了解决服务端线程太多的问题 # from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor # import time,random,os # from multiprocessing import current_process # # def task(name): # print("%s is running pid:%s"%(name,os.getpid())) # time.sleep(random.randint(1,3)) # # if __name__ == "__main__": # pool = ProcessPoolExecutor(4) #默认是cpu的核数 # for i in range(10): # pool.submit(task,"进程%s"%(i)) # # pool.shutdown() #不让在开启进程了,计数开始,相当于jion # print("主函数") # 同步调用:相当于串行,在原地等待 # 异步调用:相当于并发,不在原地等待 # 例子举办大胃王比赛:吃面包的功能,计算吃面包的个数 # 1.同步调用 # from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor # import time,random # # def eat(name): # print("%s is eatting..."%name) # time.sleep(random.randint(4,7)) # return {"name":name,"count":random.randint(1,10)} # # def counts(data): # name = data["name"] # count = data["count"] # print("%s 吃了%s个面包"%(name,count)) # # if __name__ == "__main__": # pool = ThreadPoolExecutor(5) # res1 = pool.submit(eat,"张三").result() # counts(res1) # res2 = pool.submit(eat,"李四").result() # counts(res1) # res3 = pool.submit(eat,"王五").result() # counts(res1) # # 2.异步调用,为了优化代码,为了不破坏代码逻辑,所以我们要用回调函数 # from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor # import time,random # # def eat(name): # print("%s is eatting..."%name) # time.sleep(random.randint(4,7)) # return {"name":name,"count":random.randint(1,10)} # # def counts(data): # data = data.result() # name = data["name"] # count = data["count"] # print("%s 吃了%s个面包"%(name,count)) # # if __name__ == "__main__": # pool = ThreadPoolExecutor(5) # pool.submit(eat,"张三").add_done_callback(counts) # #在后面加了回调函数,相当于把前面这个对象传给后面的函数,前面的是futures类型 duixiang.result()才是返回值 # res2 = pool.submit(eat,"李四").add_done_callback(counts) # # res3 = pool.submit(eat,"王五").add_done_callback(counts) #
View Code
线程结构:
1.两种启动方式
2.线程的属性
3.守护线程
4.互斥锁
5.GIL全局解释器锁:特殊的互斥锁
6.死锁
7.递归锁
8.信号量
9.Event事件
10.定时器:用来写验证码
11.队列,栈
12.池子
13.异步调用+回调函数
正题来了=============================
""" # from threading import Thread,currentThread,Lock,Semaphore,Event,Timer # from concurrent.futures import ThreadPoolExecutor # import time,os,random # from threading import active_count from threading import current_thread 获取线程名字 currentThread().getName() Thread().getName() 修改线程名字 currentThread().setName("新名字") Thread().setName("新名字") 开启线程 Thread().start() 等待线程结束 Thread().join() 获取线程在哪个进程中的pid os.getpid() 获取父进程的pid os.getppid() 设置守护线程 Thread().daemon = True Thread().setDaemon(True) 获取当前活跃的线程数 active_count() 线程是否活跃 Thread().is_alive() (一个锁)互斥锁:(GIL全局解释器锁:特殊的互斥锁) mutex = Lock() mutex.acquire() mutex.release() (两个及以上的锁,会发生死锁)---解决--》(递归锁解决) 信号量(本质是锁) 跟 线程池 有区别? 1、实际工作的线程是谁创建的? 使用线程池,实际工作线程由线程池创建;使用Seamphore,实际工作的线程由你自己创建。 2、限流是否自动实现? 线程池自动,Seamphore手动。 信号量怎么玩? sm = Semaphore(2) def fun(): with sm: pass Event事件: 碰到等待的时候我就用这个事件 event = Event() event.wait(3) event.set() event.isSet() event.clear() 定时器:(多少秒以后去执行哪个函数) t = Timer(5,fun3,args=("wusen",)) t.start() 队列:(线程与进程的队列 在不同的模块内) import queue q=queue.Queue(3) #队列,先进先出 # q.put("haha",block=False,timeout=3) #True堵塞,等待,加上timeout 就是最多等3秒,就不等了,如果是False不堵塞,后面的time加不加无所谓 q.get() 线程池 tps = ThreadPoolExecutor(2) #最大线程数 for i in range(10): tps.submit(fun3,"参数一","参数二") tps.shutdown() 线程池基础上:同步调用(效率不高)--解决-->异步调用+回调函数() def fun5(name): print("%s is running" % (name)) ts = random.randint(4 ,10) time.sleep(ts) return {"name":name,"counts":ts} def fun6(data): print(data) data = data.result() print(data) print("%s生产商品%s个"%(data["name"],data["counts"])) tps = ThreadPoolExecutor(3) for i in range(10): tps.submit(fun5,"t%s"%(i)).add_done_callback(fun6) tps.shutdown() """