Python 多线程并发程序设计与分析

简介: Python 多线程并发程序设计与分析

多线程并发程序设计与分析

 


1.技术难点分析与总结

难点1:线程运行时,运行顺序不固定


难点2:同一段代码,再不加锁的情况下,可能被多个线程同时执行,这会造成很多麻烦,比如变量的赋值不正确,方法的重复调用,而如果加锁,或者通过join阻塞方式等来控制,那么又如同运行单进程,效率低下,达不到,并发高速的效果。


难点3:不通过join阻塞等方式,主线程可能会优先于子线程退出,这也会导致问题,比如子线程还在用文件句柄,主线程就把文件关闭了。


解决方法:

1、考虑为线程类添加变量属性,这样一来,每个线程都拥有自己的变量,互不影响,比如下面例子中用到的run_times

 

2、线程公用的一些变量,也可以考虑通过线程类的变量属性传递,比如下面例子中多线程用到的文件句柄file_handler


3、必要时,关键代码可以考虑枷锁LockRLock具体自己看官方文档,比如下方的文件写入,不同线程可能会在同一行写入数据,导致数据统计时不准确,所以加锁,如果出于速度考虑,可以考虑分别给每个线程传递属于自己的文件句柄,写入不同的文件,


4、清理工作,关于这个,需要知道2点:

1)main线程退出时,不会kill非守护线程,但是会kill守护线程


2)通常,子线程start()后会去调用run方法,运行完run方法,子线程停止执行,不会继续运行之后的代码。

所以,通常我们可以这么做,获取当前活动线程数,如果线程数为1,则说明子线程都运行完,可以继续后面的代码清理工作,否则继续循环检测,这里还可以加代码优化,比如每隔一段时间检测一次,以免主线程浪费系统资源


   # 利用主线程执行清理工作
   current_active_thread_num =len(threading.enumerate())# 获取当前活动线程数量
while  current_active_thread_num !=1:
   time.sleep(10) #10检测一次
   current_active_thread_num =len(threading.enumerate())

 

2.代码实践

requestpy.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-

__author__ ='shouke'

importurllib.request
importjson
importsys
importthreading
fromcollectionsimportCounter
importtime
importdatetime


classSubThread(threading.Thread):
   mutex_lock = threading.RLock()
   def__init__(self, file_handler):
       self.file_handler = file_handler
       self.run_times =0# 记录每个线程的运行次数
       threading.Thread.__init__(self)

   defrun(self):
       whileself.run_times <</span> int(sys.argv[2]):
           url ='http://xxxxxx/xxxxxcard/kq/codepool/test/'

           request = urllib.request.Request(url,method='POST')
           try:
               response = urllib.request.urlopen(request)
               response_body = response.read()
               response_body = response_body.decode('utf-8')
               response_body = json.loads(response_body)

                # 写入文件
               SubThread.mutex_lock.acquire()
              self.file_handler.write(str(response_body['code']))
              self.file_handler.write('\n')
              SubThread.mutex_lock.release()



               self.run_times =self.run_times +1# 记录每个线程的运行次数
               print('已经执行%s次请求'%str(self.run_times))
           exceptExceptionase:
               print('请求错误%s'% e)

defanalyze(test_result_data):
   list_data = []       # 存放目标数据
   total_line_count =0  # 读取的文本行数
   abnormal_line =0    # 存放异常数据行数
   digit_line =0       # 存放正确数据函数

   with  open(test_result_data,'r')asfile:
       line = file.readline()
       whileline:
           line = line.strip('\n')
           ifline.isdigit()andlen(line) ==12:
               list_data.append(int(line))
               digit_line = digit_line +1
           else:
               abnormal_line = abnormal_line +1
               print('服务器返回数据异常')

           line = file.readline()
           total_line_count = total_line_count +1


   print('读取的总行数:%s'%str(total_line_count))
   print('数据正确的行数:%s'%str(digit_line))
   print('数据异常的行数:%s'%str(abnormal_line))

   # 分析是否存在重复数据
   set_data =set(list_data)
   iflen(set_data) ==len(list_data):
       print('不存在重复数据,总数:%s'%len(list_data))
   else:
       print('有重复数据,重复数据:%s'% (len(list_data) -len(set_data)))

if__name__ =='__main__':
   start_time = datetime.datetime.now()

  test_result_data ='d:\\test_result_data.txt'
   file =  open(test_result_data,'w')  # 存储服务器返回数据

   threads_pool = []  # 线程池,存放线程对象
   thread_num =0  # 记录创建的线程数量

   whilethread_num <</span> int(sys.argv[1]):
       thread_obj = SubThread(file)
       threads_pool.append(thread_obj)
       thread_num = thread_num +1

   forthreadinthreads_pool:
       thread.start()

   # 利用主线程执行清理工作
   current_active_thread_num =len(threading.enumerate())# 获取当前活动线程数量
   while  current_active_thread_num !=1:
       time.sleep(10)
       current_active_thread_num =len(threading.enumerate())

 


   # 清理工作
   try:
       file.close()
   exceptExceptionase:
       print('关闭文件出错%s'% e)
 

end_time = datetime.datetime.now()
   print('运行耗时:',end_time - start_time)
 

# 分析数据
   analyze(test_result_data)

运行(禁用time.sleep函数的情况下):

100个线程,每个线程运行50次,总的运行5000

python requestpy.py10050

 

 

修改程序如下


class SubThread(threading.Thread):

   def __init__(self, file_handler):

       self.file_handler = file_handler

       self.run_times = 0 # 记录每个线程的运行次数
       threading.Thread.__init__(self)


   def run(self):

       while self.run_times < int(sys.argv[2]):

           url = 'http://xxxxxx/xxxxxcard/kq/codepool/test/'

           request = urllib.request.Request(url, method='POST')

           try:

               response = urllib.request.urlopen(request)

               response_body = response.read()

               response_body = response_body.decode('utf-8')

               response_body = json.loads(response_body)


               # 写入文件
               self.file_handler.write(str(response_body['code']))

               self.file_handler.write('\n')


               self.run_times = self.run_times + 1 # 记录每个线程的运行次数
               print('已经执行%s次请求' % str(self.run_times))

           except Exception as e:

               print('请求错误%s' % e)


def analyze(test_result_file_list):

   list_data = []       # 存放目标数据
   total_line_count = 0  # 读取的文本行数
   abnormal_line = 0    # 存放异常数据行数
   digit_line = 0       # 存放正确数据函数

   for file in test_result_file_list:

       with  open(file, 'r') as file:

           line = file.readline()

           while line:

               line = line.strip('\n')

               if line.isdigit() and len(line) == 12:

                   list_data.append(int(line))

                   digit_line = digit_line + 1

               else:

                   abnormal_line = abnormal_line + 1

                   print('服务器返回数据异常')


               line = file.readline()

               total_line_count = total_line_count + 1



   print('读取的总行数:%s' % str(total_line_count))

   print('数据正确的行数:%s' % str(digit_line))

   print('数据异常的行数:%s' % str(abnormal_line))


   # 分析是否存在重复数据
   set_data = set(list_data)

   if len(set_data) == len(list_data):

       print('不存在重复数据, 总数:%s 条' % len(list_data))

   else:

       print('有重复数据,重复数据:%s条' % (len(list_data) - len(set_data)))


   # 获取重复数据
   filehaneder = open('d:\\repeat_data.txt', 'w')

   c = Counter(list_data)

   for item in c.items():

       if item[1] > 1:

           print('重复数据:%s' % item[0])

           filehaneder.write(str(item[0]))

           filehaneder.write('\n')

   filehaneder.close()



if __name__ == '__main__':

   start_time = datetime.datetime.now()

   base_filename = 'test_result_data'
   base_dirname = 'd:\\result\\'
   test_result_file_list = [] # 存储结果数据文件名
   sub_thread_file_list = [] # 每个线程的文件句柄

   threads_pool = []  # 线程池,存放线程对象
   thread_num = 0  # 记录创建的线程数量

   while thread_num < int(sys.argv[1]):

       filename = base_dirname + base_filename + str(thread_num + 1) + '.txt'
       test_result_file_list.append(filename)

       file =  open(filename, 'w')

       sub_thread_file_list.append(file)


       thread_obj = SubThread(file)

       threads_pool.append(thread_obj)

       thread_num = thread_num + 1


   for thread in threads_pool:

       thread.start()


   # # 利用主线程执行清理工作
   current_active_thread_num = len(threading.enumerate()) # 获取当前活动线程数量
   while  current_active_thread_num != 1:

       time.sleep(300)
       current_active_thread_num = len(threading.enumerate())


   # 清理工作
   try:

       for file in sub_thread_file_list:

           file.close()

   except Exception as e:

       print('关闭文件出错%s' % e)

   end_time = datetime.datetime.now()

   print('运行耗时:',end_time - start_time)

   # 分析数据
   analyze(test_result_file_list)

运行结果:

目录
相关文章
|
1月前
|
机器学习/深度学习 数据采集 TensorFlow
使用Python实现智能食品消费模式分析的深度学习模型
使用Python实现智能食品消费模式分析的深度学习模型
128 70
|
3天前
|
机器学习/深度学习 数据可视化 数据挖掘
使用Python实现基于矩阵分解的长期事件(MFLEs)时间序列分析
在现代数据分析中,高维时间序列数据的处理和预测极具挑战性。基于矩阵分解的长期事件(MFLEs)分析技术应运而生,通过降维和时间序列特性结合,有效应对大规模数据。MFLE利用矩阵分解提取潜在特征,降低计算复杂度,过滤噪声,并发现主要模式。相比传统方法如ARIMA和深度学习模型如LSTM,MFLE在多变量处理、计算效率和可解释性上更具优势。通过合理应用MFLE,可在物联网、金融等领域获得良好分析效果。
16 0
使用Python实现基于矩阵分解的长期事件(MFLEs)时间序列分析
|
5天前
|
数据可视化 算法 数据挖掘
Python时间序列分析工具Aeon使用指南
**Aeon** 是一个遵循 scikit-learn API 风格的开源 Python 库,专注于时间序列处理。它提供了分类、回归、聚类、预测建模和数据预处理等功能模块,支持多种算法和自定义距离度量。Aeon 活跃开发并持续更新至2024年,与 pandas 1.4.0 版本兼容,内置可视化工具,适合数据探索和基础分析任务。尽管在高级功能和性能优化方面有提升空间,但其简洁的 API 和完整的基础功能使其成为时间序列分析的有效工具。
60 37
Python时间序列分析工具Aeon使用指南
|
1天前
|
机器学习/深度学习 运维 数据可视化
Python时间序列分析:使用TSFresh进行自动化特征提取
TSFresh 是一个专门用于时间序列数据特征自动提取的框架,支持分类、回归和异常检测等机器学习任务。它通过自动化特征工程流程,处理数百个统计特征(如均值、方差、自相关性等),并通过假设检验筛选显著特征,提升分析效率。TSFresh 支持单变量和多变量时间序列数据,能够与 scikit-learn 等库无缝集成,适用于大规模时间序列数据的特征提取与模型训练。其工作流程包括数据格式转换、特征提取和选择,并提供可视化工具帮助理解特征分布及与目标变量的关系。
30 16
Python时间序列分析:使用TSFresh进行自动化特征提取
|
1月前
|
机器学习/深度学习 数据采集 TensorFlow
使用Python实现智能食品消费习惯分析的深度学习模型
使用Python实现智能食品消费习惯分析的深度学习模型
149 68
|
1月前
|
机器学习/深度学习 数据采集 数据挖掘
使用Python实现智能食品消费市场分析的深度学习模型
使用Python实现智能食品消费市场分析的深度学习模型
127 36
|
1月前
|
数据可视化 算法 数据挖掘
Python量化投资实践:基于蒙特卡洛模拟的投资组合风险建模与分析
蒙特卡洛模拟是一种利用重复随机抽样解决确定性问题的计算方法,广泛应用于金融领域的不确定性建模和风险评估。本文介绍如何使用Python和EODHD API获取历史交易数据,通过模拟生成未来价格路径,分析投资风险与收益,包括VaR和CVaR计算,以辅助投资者制定合理决策。
78 15
|
1月前
|
机器学习/深度学习 数据采集 数据挖掘
使用Python实现智能食品消费趋势分析的深度学习模型
使用Python实现智能食品消费趋势分析的深度学习模型
125 18
|
2月前
|
测试技术 开发者 Python
使用Python解析和分析源代码
本文介绍了如何使用Python的`ast`模块解析和分析Python源代码,包括安装准备、解析源代码、分析抽象语法树(AST)等步骤,展示了通过自定义`NodeVisitor`类遍历AST并提取信息的方法,为代码质量提升和自动化工具开发提供基础。
100 8
|
1月前
|
调度 开发者
核心概念解析:进程与线程的对比分析
在操作系统和计算机编程领域,进程和线程是两个基本而核心的概念。它们是程序执行和资源管理的基础,但它们之间存在显著的差异。本文将深入探讨进程与线程的区别,并分析它们在现代软件开发中的应用和重要性。
63 4