Python 多线程并发程序设计与分析

简介: Python 多线程并发程序设计与分析

多线程并发程序设计与分析

 


1.技术难点分析与总结

难点1:线程运行时,运行顺序不固定


难点2:同一段代码,再不加锁的情况下,可能被多个线程同时执行,这会造成很多麻烦,比如变量的赋值不正确,方法的重复调用,而如果加锁,或者通过join阻塞方式等来控制,那么又如同运行单进程,效率低下,达不到,并发高速的效果。


难点3:不通过join阻塞等方式,主线程可能会优先于子线程退出,这也会导致问题,比如子线程还在用文件句柄,主线程就把文件关闭了。


解决方法:

1、考虑为线程类添加变量属性,这样一来,每个线程都拥有自己的变量,互不影响,比如下面例子中用到的run_times

 

2、线程公用的一些变量,也可以考虑通过线程类的变量属性传递,比如下面例子中多线程用到的文件句柄file_handler


3、必要时,关键代码可以考虑枷锁LockRLock具体自己看官方文档,比如下方的文件写入,不同线程可能会在同一行写入数据,导致数据统计时不准确,所以加锁,如果出于速度考虑,可以考虑分别给每个线程传递属于自己的文件句柄,写入不同的文件,


4、清理工作,关于这个,需要知道2点:

1)main线程退出时,不会kill非守护线程,但是会kill守护线程


2)通常,子线程start()后会去调用run方法,运行完run方法,子线程停止执行,不会继续运行之后的代码。

所以,通常我们可以这么做,获取当前活动线程数,如果线程数为1,则说明子线程都运行完,可以继续后面的代码清理工作,否则继续循环检测,这里还可以加代码优化,比如每隔一段时间检测一次,以免主线程浪费系统资源


   # 利用主线程执行清理工作
   current_active_thread_num =len(threading.enumerate())# 获取当前活动线程数量
while  current_active_thread_num !=1:
   time.sleep(10) #10检测一次
   current_active_thread_num =len(threading.enumerate())

 

2.代码实践

requestpy.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-

__author__ ='shouke'

importurllib.request
importjson
importsys
importthreading
fromcollectionsimportCounter
importtime
importdatetime


classSubThread(threading.Thread):
   mutex_lock = threading.RLock()
   def__init__(self, file_handler):
       self.file_handler = file_handler
       self.run_times =0# 记录每个线程的运行次数
       threading.Thread.__init__(self)

   defrun(self):
       whileself.run_times <</span> int(sys.argv[2]):
           url ='http://xxxxxx/xxxxxcard/kq/codepool/test/'

           request = urllib.request.Request(url,method='POST')
           try:
               response = urllib.request.urlopen(request)
               response_body = response.read()
               response_body = response_body.decode('utf-8')
               response_body = json.loads(response_body)

                # 写入文件
               SubThread.mutex_lock.acquire()
              self.file_handler.write(str(response_body['code']))
              self.file_handler.write('\n')
              SubThread.mutex_lock.release()



               self.run_times =self.run_times +1# 记录每个线程的运行次数
               print('已经执行%s次请求'%str(self.run_times))
           exceptExceptionase:
               print('请求错误%s'% e)

defanalyze(test_result_data):
   list_data = []       # 存放目标数据
   total_line_count =0  # 读取的文本行数
   abnormal_line =0    # 存放异常数据行数
   digit_line =0       # 存放正确数据函数

   with  open(test_result_data,'r')asfile:
       line = file.readline()
       whileline:
           line = line.strip('\n')
           ifline.isdigit()andlen(line) ==12:
               list_data.append(int(line))
               digit_line = digit_line +1
           else:
               abnormal_line = abnormal_line +1
               print('服务器返回数据异常')

           line = file.readline()
           total_line_count = total_line_count +1


   print('读取的总行数:%s'%str(total_line_count))
   print('数据正确的行数:%s'%str(digit_line))
   print('数据异常的行数:%s'%str(abnormal_line))

   # 分析是否存在重复数据
   set_data =set(list_data)
   iflen(set_data) ==len(list_data):
       print('不存在重复数据,总数:%s'%len(list_data))
   else:
       print('有重复数据,重复数据:%s'% (len(list_data) -len(set_data)))

if__name__ =='__main__':
   start_time = datetime.datetime.now()

  test_result_data ='d:\\test_result_data.txt'
   file =  open(test_result_data,'w')  # 存储服务器返回数据

   threads_pool = []  # 线程池,存放线程对象
   thread_num =0  # 记录创建的线程数量

   whilethread_num <</span> int(sys.argv[1]):
       thread_obj = SubThread(file)
       threads_pool.append(thread_obj)
       thread_num = thread_num +1

   forthreadinthreads_pool:
       thread.start()

   # 利用主线程执行清理工作
   current_active_thread_num =len(threading.enumerate())# 获取当前活动线程数量
   while  current_active_thread_num !=1:
       time.sleep(10)
       current_active_thread_num =len(threading.enumerate())

 


   # 清理工作
   try:
       file.close()
   exceptExceptionase:
       print('关闭文件出错%s'% e)
 

end_time = datetime.datetime.now()
   print('运行耗时:',end_time - start_time)
 

# 分析数据
   analyze(test_result_data)

运行(禁用time.sleep函数的情况下):

100个线程,每个线程运行50次,总的运行5000

python requestpy.py10050

 

 

修改程序如下


class SubThread(threading.Thread):

   def __init__(self, file_handler):

       self.file_handler = file_handler

       self.run_times = 0 # 记录每个线程的运行次数
       threading.Thread.__init__(self)


   def run(self):

       while self.run_times < int(sys.argv[2]):

           url = 'http://xxxxxx/xxxxxcard/kq/codepool/test/'

           request = urllib.request.Request(url, method='POST')

           try:

               response = urllib.request.urlopen(request)

               response_body = response.read()

               response_body = response_body.decode('utf-8')

               response_body = json.loads(response_body)


               # 写入文件
               self.file_handler.write(str(response_body['code']))

               self.file_handler.write('\n')


               self.run_times = self.run_times + 1 # 记录每个线程的运行次数
               print('已经执行%s次请求' % str(self.run_times))

           except Exception as e:

               print('请求错误%s' % e)


def analyze(test_result_file_list):

   list_data = []       # 存放目标数据
   total_line_count = 0  # 读取的文本行数
   abnormal_line = 0    # 存放异常数据行数
   digit_line = 0       # 存放正确数据函数

   for file in test_result_file_list:

       with  open(file, 'r') as file:

           line = file.readline()

           while line:

               line = line.strip('\n')

               if line.isdigit() and len(line) == 12:

                   list_data.append(int(line))

                   digit_line = digit_line + 1

               else:

                   abnormal_line = abnormal_line + 1

                   print('服务器返回数据异常')


               line = file.readline()

               total_line_count = total_line_count + 1



   print('读取的总行数:%s' % str(total_line_count))

   print('数据正确的行数:%s' % str(digit_line))

   print('数据异常的行数:%s' % str(abnormal_line))


   # 分析是否存在重复数据
   set_data = set(list_data)

   if len(set_data) == len(list_data):

       print('不存在重复数据, 总数:%s 条' % len(list_data))

   else:

       print('有重复数据,重复数据:%s条' % (len(list_data) - len(set_data)))


   # 获取重复数据
   filehaneder = open('d:\\repeat_data.txt', 'w')

   c = Counter(list_data)

   for item in c.items():

       if item[1] > 1:

           print('重复数据:%s' % item[0])

           filehaneder.write(str(item[0]))

           filehaneder.write('\n')

   filehaneder.close()



if __name__ == '__main__':

   start_time = datetime.datetime.now()

   base_filename = 'test_result_data'
   base_dirname = 'd:\\result\\'
   test_result_file_list = [] # 存储结果数据文件名
   sub_thread_file_list = [] # 每个线程的文件句柄

   threads_pool = []  # 线程池,存放线程对象
   thread_num = 0  # 记录创建的线程数量

   while thread_num < int(sys.argv[1]):

       filename = base_dirname + base_filename + str(thread_num + 1) + '.txt'
       test_result_file_list.append(filename)

       file =  open(filename, 'w')

       sub_thread_file_list.append(file)


       thread_obj = SubThread(file)

       threads_pool.append(thread_obj)

       thread_num = thread_num + 1


   for thread in threads_pool:

       thread.start()


   # # 利用主线程执行清理工作
   current_active_thread_num = len(threading.enumerate()) # 获取当前活动线程数量
   while  current_active_thread_num != 1:

       time.sleep(300)
       current_active_thread_num = len(threading.enumerate())


   # 清理工作
   try:

       for file in sub_thread_file_list:

           file.close()

   except Exception as e:

       print('关闭文件出错%s' % e)

   end_time = datetime.datetime.now()

   print('运行耗时:',end_time - start_time)

   # 分析数据
   analyze(test_result_file_list)

运行结果:

目录
相关文章
|
21天前
|
机器学习/深度学习 数据采集 TensorFlow
使用Python实现智能食品消费模式分析的深度学习模型
使用Python实现智能食品消费模式分析的深度学习模型
114 70
|
23天前
|
机器学习/深度学习 数据采集 TensorFlow
使用Python实现智能食品消费习惯分析的深度学习模型
使用Python实现智能食品消费习惯分析的深度学习模型
125 68
|
19天前
|
机器学习/深度学习 数据采集 数据挖掘
使用Python实现智能食品消费市场分析的深度学习模型
使用Python实现智能食品消费市场分析的深度学习模型
96 36
|
13天前
|
数据可视化 算法 数据挖掘
Python量化投资实践:基于蒙特卡洛模拟的投资组合风险建模与分析
蒙特卡洛模拟是一种利用重复随机抽样解决确定性问题的计算方法,广泛应用于金融领域的不确定性建模和风险评估。本文介绍如何使用Python和EODHD API获取历史交易数据,通过模拟生成未来价格路径,分析投资风险与收益,包括VaR和CVaR计算,以辅助投资者制定合理决策。
60 15
|
17天前
|
机器学习/深度学习 数据采集 数据挖掘
使用Python实现智能食品消费趋势分析的深度学习模型
使用Python实现智能食品消费趋势分析的深度学习模型
77 18
|
26天前
|
测试技术 开发者 Python
使用Python解析和分析源代码
本文介绍了如何使用Python的`ast`模块解析和分析Python源代码,包括安装准备、解析源代码、分析抽象语法树(AST)等步骤,展示了通过自定义`NodeVisitor`类遍历AST并提取信息的方法,为代码质量提升和自动化工具开发提供基础。
43 8
|
29天前
|
数据采集 存储 数据处理
Python中的多线程编程及其在数据处理中的应用
本文深入探讨了Python中多线程编程的概念、原理和实现方法,并详细介绍了其在数据处理领域的应用。通过对比单线程与多线程的性能差异,展示了多线程编程在提升程序运行效率方面的显著优势。文章还提供了实际案例,帮助读者更好地理解和掌握多线程编程技术。
|
1月前
|
安全 Java
线程安全的艺术:确保并发程序的正确性
在多线程环境中,确保线程安全是编程中的一个核心挑战。线程安全问题可能导致数据不一致、程序崩溃甚至安全漏洞。本文将分享如何确保线程安全,探讨不同的技术策略和最佳实践。
41 6
|
1月前
|
安全 Java 开发者
Java 多线程并发控制:深入理解与实战应用
《Java多线程并发控制:深入理解与实战应用》一书详细解析了Java多线程编程的核心概念、并发控制技术及其实战技巧,适合Java开发者深入学习和实践参考。
58 6
|
1月前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####