Python 多线程并发程序设计与分析

简介: Python 多线程并发程序设计与分析

多线程并发程序设计与分析

 


1.技术难点分析与总结

难点1:线程运行时,运行顺序不固定


难点2:同一段代码,再不加锁的情况下,可能被多个线程同时执行,这会造成很多麻烦,比如变量的赋值不正确,方法的重复调用,而如果加锁,或者通过join阻塞方式等来控制,那么又如同运行单进程,效率低下,达不到,并发高速的效果。


难点3:不通过join阻塞等方式,主线程可能会优先于子线程退出,这也会导致问题,比如子线程还在用文件句柄,主线程就把文件关闭了。


解决方法:

1、考虑为线程类添加变量属性,这样一来,每个线程都拥有自己的变量,互不影响,比如下面例子中用到的run_times

 

2、线程公用的一些变量,也可以考虑通过线程类的变量属性传递,比如下面例子中多线程用到的文件句柄file_handler


3、必要时,关键代码可以考虑枷锁LockRLock具体自己看官方文档,比如下方的文件写入,不同线程可能会在同一行写入数据,导致数据统计时不准确,所以加锁,如果出于速度考虑,可以考虑分别给每个线程传递属于自己的文件句柄,写入不同的文件,


4、清理工作,关于这个,需要知道2点:

1)main线程退出时,不会kill非守护线程,但是会kill守护线程


2)通常,子线程start()后会去调用run方法,运行完run方法,子线程停止执行,不会继续运行之后的代码。

所以,通常我们可以这么做,获取当前活动线程数,如果线程数为1,则说明子线程都运行完,可以继续后面的代码清理工作,否则继续循环检测,这里还可以加代码优化,比如每隔一段时间检测一次,以免主线程浪费系统资源


   # 利用主线程执行清理工作
   current_active_thread_num =len(threading.enumerate())# 获取当前活动线程数量
while  current_active_thread_num !=1:
   time.sleep(10) #10检测一次
   current_active_thread_num =len(threading.enumerate())

 

2.代码实践

requestpy.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-

__author__ ='shouke'

importurllib.request
importjson
importsys
importthreading
fromcollectionsimportCounter
importtime
importdatetime


classSubThread(threading.Thread):
   mutex_lock = threading.RLock()
   def__init__(self, file_handler):
       self.file_handler = file_handler
       self.run_times =0# 记录每个线程的运行次数
       threading.Thread.__init__(self)

   defrun(self):
       whileself.run_times <</span> int(sys.argv[2]):
           url ='http://xxxxxx/xxxxxcard/kq/codepool/test/'

           request = urllib.request.Request(url,method='POST')
           try:
               response = urllib.request.urlopen(request)
               response_body = response.read()
               response_body = response_body.decode('utf-8')
               response_body = json.loads(response_body)

                # 写入文件
               SubThread.mutex_lock.acquire()
              self.file_handler.write(str(response_body['code']))
              self.file_handler.write('\n')
              SubThread.mutex_lock.release()



               self.run_times =self.run_times +1# 记录每个线程的运行次数
               print('已经执行%s次请求'%str(self.run_times))
           exceptExceptionase:
               print('请求错误%s'% e)

defanalyze(test_result_data):
   list_data = []       # 存放目标数据
   total_line_count =0  # 读取的文本行数
   abnormal_line =0    # 存放异常数据行数
   digit_line =0       # 存放正确数据函数

   with  open(test_result_data,'r')asfile:
       line = file.readline()
       whileline:
           line = line.strip('\n')
           ifline.isdigit()andlen(line) ==12:
               list_data.append(int(line))
               digit_line = digit_line +1
           else:
               abnormal_line = abnormal_line +1
               print('服务器返回数据异常')

           line = file.readline()
           total_line_count = total_line_count +1


   print('读取的总行数:%s'%str(total_line_count))
   print('数据正确的行数:%s'%str(digit_line))
   print('数据异常的行数:%s'%str(abnormal_line))

   # 分析是否存在重复数据
   set_data =set(list_data)
   iflen(set_data) ==len(list_data):
       print('不存在重复数据,总数:%s'%len(list_data))
   else:
       print('有重复数据,重复数据:%s'% (len(list_data) -len(set_data)))

if__name__ =='__main__':
   start_time = datetime.datetime.now()

  test_result_data ='d:\\test_result_data.txt'
   file =  open(test_result_data,'w')  # 存储服务器返回数据

   threads_pool = []  # 线程池,存放线程对象
   thread_num =0  # 记录创建的线程数量

   whilethread_num <</span> int(sys.argv[1]):
       thread_obj = SubThread(file)
       threads_pool.append(thread_obj)
       thread_num = thread_num +1

   forthreadinthreads_pool:
       thread.start()

   # 利用主线程执行清理工作
   current_active_thread_num =len(threading.enumerate())# 获取当前活动线程数量
   while  current_active_thread_num !=1:
       time.sleep(10)
       current_active_thread_num =len(threading.enumerate())

 


   # 清理工作
   try:
       file.close()
   exceptExceptionase:
       print('关闭文件出错%s'% e)
 

end_time = datetime.datetime.now()
   print('运行耗时:',end_time - start_time)
 

# 分析数据
   analyze(test_result_data)

运行(禁用time.sleep函数的情况下):

100个线程,每个线程运行50次,总的运行5000

python requestpy.py10050

 

 

修改程序如下


class SubThread(threading.Thread):

   def __init__(self, file_handler):

       self.file_handler = file_handler

       self.run_times = 0 # 记录每个线程的运行次数
       threading.Thread.__init__(self)


   def run(self):

       while self.run_times < int(sys.argv[2]):

           url = 'http://xxxxxx/xxxxxcard/kq/codepool/test/'

           request = urllib.request.Request(url, method='POST')

           try:

               response = urllib.request.urlopen(request)

               response_body = response.read()

               response_body = response_body.decode('utf-8')

               response_body = json.loads(response_body)


               # 写入文件
               self.file_handler.write(str(response_body['code']))

               self.file_handler.write('\n')


               self.run_times = self.run_times + 1 # 记录每个线程的运行次数
               print('已经执行%s次请求' % str(self.run_times))

           except Exception as e:

               print('请求错误%s' % e)


def analyze(test_result_file_list):

   list_data = []       # 存放目标数据
   total_line_count = 0  # 读取的文本行数
   abnormal_line = 0    # 存放异常数据行数
   digit_line = 0       # 存放正确数据函数

   for file in test_result_file_list:

       with  open(file, 'r') as file:

           line = file.readline()

           while line:

               line = line.strip('\n')

               if line.isdigit() and len(line) == 12:

                   list_data.append(int(line))

                   digit_line = digit_line + 1

               else:

                   abnormal_line = abnormal_line + 1

                   print('服务器返回数据异常')


               line = file.readline()

               total_line_count = total_line_count + 1



   print('读取的总行数:%s' % str(total_line_count))

   print('数据正确的行数:%s' % str(digit_line))

   print('数据异常的行数:%s' % str(abnormal_line))


   # 分析是否存在重复数据
   set_data = set(list_data)

   if len(set_data) == len(list_data):

       print('不存在重复数据, 总数:%s 条' % len(list_data))

   else:

       print('有重复数据,重复数据:%s条' % (len(list_data) - len(set_data)))


   # 获取重复数据
   filehaneder = open('d:\\repeat_data.txt', 'w')

   c = Counter(list_data)

   for item in c.items():

       if item[1] > 1:

           print('重复数据:%s' % item[0])

           filehaneder.write(str(item[0]))

           filehaneder.write('\n')

   filehaneder.close()



if __name__ == '__main__':

   start_time = datetime.datetime.now()

   base_filename = 'test_result_data'
   base_dirname = 'd:\\result\\'
   test_result_file_list = [] # 存储结果数据文件名
   sub_thread_file_list = [] # 每个线程的文件句柄

   threads_pool = []  # 线程池,存放线程对象
   thread_num = 0  # 记录创建的线程数量

   while thread_num < int(sys.argv[1]):

       filename = base_dirname + base_filename + str(thread_num + 1) + '.txt'
       test_result_file_list.append(filename)

       file =  open(filename, 'w')

       sub_thread_file_list.append(file)


       thread_obj = SubThread(file)

       threads_pool.append(thread_obj)

       thread_num = thread_num + 1


   for thread in threads_pool:

       thread.start()


   # # 利用主线程执行清理工作
   current_active_thread_num = len(threading.enumerate()) # 获取当前活动线程数量
   while  current_active_thread_num != 1:

       time.sleep(300)
       current_active_thread_num = len(threading.enumerate())


   # 清理工作
   try:

       for file in sub_thread_file_list:

           file.close()

   except Exception as e:

       print('关闭文件出错%s' % e)

   end_time = datetime.datetime.now()

   print('运行耗时:',end_time - start_time)

   # 分析数据
   analyze(test_result_file_list)

运行结果:

目录
相关文章
|
8天前
|
数据采集 缓存 定位技术
网络延迟对Python爬虫速度的影响分析
网络延迟对Python爬虫速度的影响分析
|
6天前
|
并行计算 数据处理 调度
Python中的并发编程:探索多线程与多进程的奥秘####
本文深入探讨了Python中并发编程的两种主要方式——多线程与多进程,通过对比分析它们的工作原理、适用场景及性能差异,揭示了在不同应用需求下如何合理选择并发模型。文章首先简述了并发编程的基本概念,随后详细阐述了Python中多线程与多进程的实现机制,包括GIL(全局解释器锁)对多线程的影响以及多进程的独立内存空间特性。最后,通过实例演示了如何在Python项目中有效利用多线程和多进程提升程序性能。 ####
|
9天前
|
API 数据处理 Python
探秘Python并发新世界:asyncio库,让你的代码并发更优雅!
在Python编程中,随着网络应用和数据处理需求的增长,并发编程变得愈发重要。asyncio库作为Python 3.4及以上版本的标准库,以其简洁的API和强大的异步编程能力,成为提升性能和优化资源利用的关键工具。本文介绍了asyncio的基本概念、异步函数的定义与使用、并发控制和资源管理等核心功能,通过具体示例展示了如何高效地编写并发代码。
20 2
|
10天前
|
数据采集 存储 JSON
Python爬虫开发中的分析与方案制定
Python爬虫开发中的分析与方案制定
|
12天前
|
存储 设计模式 分布式计算
Java中的多线程编程:并发与并行的深度解析####
在当今软件开发领域,多线程编程已成为提升应用性能、响应速度及资源利用率的关键手段之一。本文将深入探讨Java平台上的多线程机制,从基础概念到高级应用,全面解析并发与并行编程的核心理念、实现方式及其在实际项目中的应用策略。不同于常规摘要的简洁概述,本文旨在通过详尽的技术剖析,为读者构建一个系统化的多线程知识框架,辅以生动实例,让抽象概念具体化,复杂问题简单化。 ####
|
17天前
|
数据可视化 开发者 Python
Python GUI开发:Tkinter与PyQt的实战应用与对比分析
【10月更文挑战第26天】本文介绍了Python中两种常用的GUI工具包——Tkinter和PyQt。Tkinter内置于Python标准库,适合初学者快速上手,提供基本的GUI组件和方法。PyQt基于Qt库,功能强大且灵活,适用于创建复杂的GUI应用程序。通过实战示例和对比分析,帮助开发者选择合适的工具包以满足项目需求。
62 7
|
16天前
|
存储 数据处理 Python
Python科学计算:NumPy与SciPy的高效数据处理与分析
【10月更文挑战第27天】在科学计算和数据分析领域,Python凭借简洁的语法和强大的库支持广受欢迎。NumPy和SciPy作为Python科学计算的两大基石,提供了高效的数据处理和分析工具。NumPy的核心功能是N维数组对象(ndarray),支持高效的大型数据集操作;SciPy则在此基础上提供了线性代数、信号处理、优化和统计分析等多种科学计算工具。结合使用NumPy和SciPy,可以显著提升数据处理和分析的效率,使Python成为科学计算和数据分析的首选语言。
26 3
|
18天前
|
Java Unix 调度
python多线程!
本文介绍了线程的基本概念、多线程技术、线程的创建与管理、线程间的通信与同步机制,以及线程池和队列模块的使用。文章详细讲解了如何使用 `_thread` 和 `threading` 模块创建和管理线程,介绍了线程锁 `Lock` 的作用和使用方法,解决了多线程环境下的数据共享问题。此外,还介绍了 `Timer` 定时器和 `ThreadPoolExecutor` 线程池的使用,最后通过一个具体的案例展示了如何使用多线程爬取电影票房数据。文章还对比了进程和线程的优缺点,并讨论了计算密集型和IO密集型任务的适用场景。
38 4
|
17天前
|
存储 机器学习/深度学习 算法
Python科学计算:NumPy与SciPy的高效数据处理与分析
【10月更文挑战第26天】NumPy和SciPy是Python科学计算领域的两大核心库。NumPy提供高效的多维数组对象和丰富的数学函数,而SciPy则在此基础上提供了更多高级的科学计算功能,如数值积分、优化和统计等。两者结合使Python在科学计算中具有极高的效率和广泛的应用。
33 2
|
18天前
|
数据库 开发者 Python
“Python异步编程革命:如何从编程新手蜕变为并发大师,掌握未来技术的制胜法宝”
【10月更文挑战第25天】介绍了Python异步编程的基础和高级技巧。文章从同步与异步编程的区别入手,逐步讲解了如何使用`asyncio`库和`async`/`await`关键字进行异步编程。通过对比传统多线程,展示了异步编程在I/O密集型任务中的优势,并提供了最佳实践建议。
16 1