30天拿下Python之使用多线程

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 30天拿下Python之使用多线程

概述

在上一节,我们介绍了如何在Python中使用MySQL,包括:mysql.connector简介、mysql.connector的函数、使用mysql.connector等内容。在这一节,我们将介绍如何在Python中使用多线程。多线程是指一个程序同时运行多个线程,每个线程独立执行不同的任务。在当今的计算机科学领域,多线程技术已成为解决高并发、性能优化等问题的重要手段。Python通过内置的threading模块,提供了强大的多线程支持。在实际应用中,合理地使用多线程可以帮助我们提高程序的执行效率,实现并行计算,优化资源利用和用户体验。

使用Python的threading模块,我们可以创建和管理线程。线程是进程的基本执行单元,它们在进程的内部并行执行。在Python中,由于全局解释器锁(GIL)的存在,多线程在CPU密集型任务中可能不会提高执行速度。但在IO密集型任务中,比如:网络请求、文件读写等,使用多线程可以显著提高程序的执行效率。

threading模块

Python的threading模块是用于提供线程支持的,以下是threading模块中一些常用的函数和类。

threading.Thread(target, name, args, kwargs):创建线程的主要方法,target是要执行的函数,name是线程的名字,args和kwargs是传递给函数的参数。

threading.current_thread():返回当前的线程对象。

threading.enumerate():返回当前所有活跃的Thread对象列表。

threading.active_count():返回当前活跃的Thread对象数量。

threading.Lock():线程锁,用于防止多个线程同时访问某些资源造成数据混乱。

threading.RLock():可重入线程锁,允许线程在已经持有锁的情况下,再次获取同一个锁。

threading.Event(): 用于创建事件对象,以便进行线程间的通信。

threading.Condition():条件变量,用于让一个线程等待,直到特定条件成立。

threading.Semaphore():信号量,用于限制同时访问特定资源的线程数量。

threading.BoundedSemaphore():有边界的信号量,与Semaphore不同,它会限制信号量的上限。

threading.Timer(interval, function, args, kwargs):在指定的时间间隔后,执行一个操作。

threading.local():创建一个线程局部数据对象,每个线程都有自己的数据副本。

使用线程

在Python的threading模块中,Thread类是用来创建线程的对象。一个基本的Thread对象可以参照下面的示例代码进行创建。

import time
import threading
def print_numbers():
    for i in range(5):
        time.sleep(1)
        print('number is:', i)
# 创建线程
t = threading.Thread(target = print_numbers)
# 启动线程
t.start()
# 等待线程结束
t.join()


在上面的示例代码中,我们定义了一个函数print_numbers(),然后创建了一个新的Thread对象,目标函数就是print_numbers()。调用t.start()会启动这个线程,然后你的函数就会在新的线程中自动运行。调用t.join()会等待线程执行结束,在这个例子中,就是等待print_numbers()函数中的for循环执行结束。

当然,也可以同时创建多个线程,参看下面的示例代码。

import time
import threading
def print_numbers():
    name = threading.current_thread().name
    for i in range(5):
        time.sleep(1)
        print('%s, number is: %d'%(name, i))
# 创建线程
t1 = threading.Thread(target = print_numbers, name = 'thread 1')
t2 = threading.Thread(target = print_numbers, name = 'thread 2')
# 启动线程
t1.start()
t2.start()
# 等待线程结束
t1.join()
t2.join()


可以看到,上述示例代码运行后,线程1和线程2会交替输出信息。输出结果如下:

thread 2, number is: 0
thread 1, number is: 0
thread 1, number is: 1
thread 2, number is: 1
thread 1, number is: 2
thread 2, number is: 2
thread 1, number is: 3
thread 2, number is: 3
thread 1, number is: 4
thread 2, number is: 4


除了threading.Thread,我们还可以使用hreading.Timer在线程中运行指定的任务。threading.Timer主要用于创建定时器,以便在指定的时间间隔后执行一个操作。需要注意的是:threading.Timer是在一个新的线程中运行的,如果函数涉及到修改共享数据和资源,可能需要使用适当的同步机制来避免并发问题。

import threading  
 
def print_msg():
    print("Hello World")
# 创建定时器,3秒后执行print_msg函数
timer = threading.Timer(3, print_msg)
# 开始计时器
timer.start()


在上面的示例代码中,程序将等待3秒,然后打印“Hello World”的字符串。

创建自定义线程

在Python中,可以通过继承threading.Thread类来创建自定义的线程类。在自定义的线程类中,通常会重写一些函数以更改默认行为,比如:run()函数。

import threading
# 自定义线程类
class MyThread(threading.Thread):
    def __init__(self, data):
        threading.Thread.__init__(self)
        self.data = data
 
    def run(self):
        # 输出:Hello World
        print(f"Hello {self.data}")
# 创建自定义线程类的对象
my_thread = MyThread('World')
# 启动线程
my_thread.start()
# 等待线程结束
my_thread.join()



在上面的示例代码中,我们创建了一个名为MyThread的新类,并继承了threading.Thread。在MyThread类的init函数中,首先调用父类threading.Thread的init函数来进行初始化,然后设置了一个名为data的属性。我们重写了run()函数,这样当调用my_thread.start()时,就会执行我们自定义的run()函数,而不是父类的。

下面是一个创建和使用自定义线程类更复杂的示例。

import time
import threading
class MyThread2(threading.Thread):
    def __init__(self, thread_id, name):
        threading.Thread.__init__(self)
        self.thread_id = thread_id
        self.name = name
 
    def run(self):
        print(f"start thread: {self.name}")
        for i in range(5):
            time.sleep(1)
            print(f"thread {self.name} is running")
        print(f"exit sub thread: {self.name}")
# 创建线程对象
threads = []
for i in range(3):
    thread = MyThread2(i, f"Thread-{i}")
    thread.start()
    threads.append(thread)
# 等待所有线程完成
for t in threads:
    t.join()
print('exit main thread')




在上面的示例代码中,MyThread2类有一个构造函数,该函数接受一个线程ID和一个名称作为参数,并在内部调用父类threading.Thread的构造函数。run()函数被重写以执行我们想要的线程任务:打印一条消息,然后休眠一秒,再打印另一条消息,重复5次。最后,我们创建了3个线程并启动它们,然后等待所有线程完成它们的任务。

线程同步

多线程编程可能会引发一些潜在的问题,比如:数据不一致、竞态条件等。为了解决这些问题,我们需要使用线程同步技术。线程同步是一种机制,用于协调多个线程的执行,以确保它们能正确、有效地共享资源或进行协作。Python提供了几种线程同步机制,包括:锁(Lock)、事件(Event)、条件(Condition)、信号量(Semaphore)。下面,将分别进行介绍。

1、锁(Lock)是最基本的线程同步机制。在Python中,我们可以使用threading.Lock类来实现。锁有两种状态:锁定和未锁定。当一个线程获得锁时,其他试图获得锁的线程将被阻塞,直到锁被释放。

import threading
lock = threading.Lock()
def thread_func():
    with lock:
        # 线程安全的代码块
        pass
def thread_func2():
    lock.acquire()
    # 线程安全的代码块
    lock.release()


在上面的示例代码中,thread_func()函数使用with lock的方式对共享资源进行锁定,thread_func()函数使用lock.acquire()和lock.release()配对调用的方式对共享资源进行锁定。

我们来看一看在多线程中使用锁的示例代码。

import time
import threading
class Counter:
    def __init__(self):
        self.count = 0
        self.lock = threading.Lock()
 
    def increment(self):
        with self.lock:
            self.count += 1
            print(f"Count: {self.count}")
 
def worker(counter):
    for _ in range(10):
        counter.increment()
 
counter = Counter()
threads = []
for _ in range(3):
    t = threading.Thread(target = worker, args = (counter,))
    threads.append(t)
    t.start()
for t in threads:
    t.join()
print(f"Final Count: {counter.count}")



在上面的示例代码中,我们定义了一个Counter类,其中包含一个计数器和一个锁。increment函数会在增加计数器之前获取锁,并在完成后释放锁,这样就可以确保任何时候只有一个线程能够修改计数器。然后,我们创建了3个线程,每个线程都会尝试对计数器进行10次增量操作。由于使用了锁,所有的增量操作都会正确地被序列化,最后的计数将总是30。

2、事件(Event)用于线程间的通信。threading.Event类提供了一个线程可以设置的信号标志,其他线程可以等待这个标志被设置。

import threading
event = threading.Event()
def thread_func():
    # 阻塞线程,直到事件被设置
    event.wait()
    # 当事件被设置后执行的代码
    print('event waited')
event.set()
thread_func()


在上面的示例代码中,调用event.wait()函数时会被阻塞,只有当其他地方调用event.set()设置了事件信号时,才会解除阻塞,继续往下执行代码。

3、条件(Condition)用于更复杂的线程同步问题。threading.Condition类提供了一种等待某个条件满足的方式,它通常与锁一起使用。

import time
import threading
class SharedData:
    def __init__(self):
        self.lock = threading.Lock()
        self.condition = threading.Condition(self.lock)
        self.value = 0
 
    def increment(self):
        while True:
            self.condition.acquire()
            while self.value >= 10:
                self.condition.wait()
            self.value += 1
            print(f"Value increased to {self.value}")
            self.condition.notify_all()
            self.condition.release()
            time.sleep(1)
 
    def decrement(self):
        while True:
            self.condition.acquire()
            while self.value <= 0:
                self.condition.wait()
            self.value -= 1
            print(f"Value decreased to {self.value}")
            sleep_time = 2 if self.value <= 5 else 0.5
            self.condition.notify_all()
            self.condition.release()
            time.sleep(sleep_time)
 
sd = SharedData()
thread1 = threading.Thread(target = sd.increment)
thread2 = threading.Thread(target = sd.decrement)
thread1.start()
thread2.start()
thread1.join()
thread2.join()




在上面的示例代码中,SharedData类有一个value属性,以及一个Condition对象。increment和decrement方法都使用Condition来确保value始终在0和10之间。如果value超出这个范围,当前线程会调用wait方法,将自己放入等待队列,并释放锁,让其他线程有机会执行。当value回到有效范围时,线程会调用notify_all方法,唤醒等待队列中的所有线程。

4、信号量(Semaphore)用于限制对资源的访问。threading.Semaphore类提供了一个计数器,用于控制可以同时访问某个资源的线程数量。


import time
import threading
 
# 创建一个Semaphore,最大允许3个线程同时访问共享资源
semaphore = threading.Semaphore(3)
def MyWorker():
    # 获取Semaphore
    semaphore.acquire()
    # 访问共享资源的代码
    for i in range(6):
        print("MyWorker {} is working: {}".format(threading.current_thread().name, i))
        time.sleep(1)
    # 释放Semaphore
    semaphore.release()
# 创建5个线程
threads = []
for i in range(5):
    t = threading.Thread(target = MyWorker, name = str(i))
    t.start()
    threads.append(t)
# 等待所有线程完成
for t in threads:
    t.join()



在上面的示例代码中,我们创建了一个Semaphore,最大允许3个线程同时访问共享资源。每个线程在访问共享资源之前获取Semaphore,并在完成后释放Semaphore。这样,我们便可以确保任何时候最多只有3个线程同时访问共享资源。


相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
1月前
|
安全 数据处理 开发者
Python中的多线程编程:从入门到精通
本文将深入探讨Python中的多线程编程,包括其基本原理、应用场景、实现方法以及常见问题和解决方案。通过本文的学习,读者将对Python多线程编程有一个全面的认识,能够在实际项目中灵活运用。
|
14天前
|
并行计算 数据处理 调度
Python中的并发编程:探索多线程与多进程的奥秘####
本文深入探讨了Python中并发编程的两种主要方式——多线程与多进程,通过对比分析它们的工作原理、适用场景及性能差异,揭示了在不同应用需求下如何合理选择并发模型。文章首先简述了并发编程的基本概念,随后详细阐述了Python中多线程与多进程的实现机制,包括GIL(全局解释器锁)对多线程的影响以及多进程的独立内存空间特性。最后,通过实例演示了如何在Python项目中有效利用多线程和多进程提升程序性能。 ####
|
26天前
|
Java Unix 调度
python多线程!
本文介绍了线程的基本概念、多线程技术、线程的创建与管理、线程间的通信与同步机制,以及线程池和队列模块的使用。文章详细讲解了如何使用 `_thread` 和 `threading` 模块创建和管理线程,介绍了线程锁 `Lock` 的作用和使用方法,解决了多线程环境下的数据共享问题。此外,还介绍了 `Timer` 定时器和 `ThreadPoolExecutor` 线程池的使用,最后通过一个具体的案例展示了如何使用多线程爬取电影票房数据。文章还对比了进程和线程的优缺点,并讨论了计算密集型和IO密集型任务的适用场景。
47 4
|
8天前
|
数据采集 Java Python
爬取小说资源的Python实践:从单线程到多线程的效率飞跃
本文介绍了一种使用Python从笔趣阁网站爬取小说内容的方法,并通过引入多线程技术大幅提高了下载效率。文章首先概述了环境准备,包括所需安装的库,然后详细描述了爬虫程序的设计与实现过程,包括发送HTTP请求、解析HTML文档、提取章节链接及多线程下载等步骤。最后,强调了性能优化的重要性,并提醒读者遵守相关法律法规。
41 0
|
1月前
|
Python
Python中的多线程与多进程
本文将探讨Python中多线程和多进程的基本概念、使用场景以及实现方式。通过对比分析,我们将了解何时使用多线程或多进程更为合适,并提供一些实用的代码示例来帮助读者更好地理解这两种并发编程技术。
|
1月前
|
Java Python
python知识点100篇系列(16)-python中如何获取线程的返回值
【10月更文挑战第3天】本文介绍了两种在Python中实现多线程并获取返回值的方法。第一种是通过自定义线程类继承`Thread`类,重写`run`和`join`方法来实现;第二种则是利用`concurrent.futures`库,通过`ThreadPoolExecutor`管理线程池,简化了线程管理和结果获取的过程,推荐使用。示例代码展示了这两种方法的具体实现方式。
python知识点100篇系列(16)-python中如何获取线程的返回值
|
1月前
|
数据挖掘 程序员 调度
探索Python的并发编程:线程与进程的实战应用
【10月更文挑战第4天】 本文深入探讨了Python中实现并发编程的两种主要方式——线程和进程,通过对比分析它们的特点、适用场景以及在实际编程中的应用,为读者提供清晰的指导。同时,文章还介绍了一些高级并发模型如协程,并给出了性能优化的建议。
31 3
|
1月前
|
并行计算 安全 Java
Python 多线程并行执行详解
Python 多线程并行执行详解
70 3
|
1月前
|
网络协议 安全 Java
难懂,误点!将多线程技术应用于Python的异步事件循环
难懂,误点!将多线程技术应用于Python的异步事件循环
67 0
|
1月前
|
安全 Java 数据库连接
Python多线程编程:竞争问题的解析与应对策略
Python多线程编程:竞争问题的解析与应对策略
24 0
下一篇
无影云桌面