Python 多线程并发程序设计与分析

简介: Python 多线程并发程序设计与分析

多线程并发程序设计与分析

 


1.技术难点分析与总结

难点1:线程运行时,运行顺序不固定


难点2:同一段代码,再不加锁的情况下,可能被多个线程同时执行,这会造成很多麻烦,比如变量的赋值不正确,方法的重复调用,而如果加锁,或者通过join阻塞方式等来控制,那么又如同运行单进程,效率低下,达不到,并发高速的效果。


难点3:不通过join阻塞等方式,主线程可能会优先于子线程退出,这也会导致问题,比如子线程还在用文件句柄,主线程就把文件关闭了。


解决方法:

1、考虑为线程类添加变量属性,这样一来,每个线程都拥有自己的变量,互不影响,比如下面例子中用到的run_times

 

2、线程公用的一些变量,也可以考虑通过线程类的变量属性传递,比如下面例子中多线程用到的文件句柄file_handler


3、必要时,关键代码可以考虑枷锁LockRLock具体自己看官方文档,比如下方的文件写入,不同线程可能会在同一行写入数据,导致数据统计时不准确,所以加锁,如果出于速度考虑,可以考虑分别给每个线程传递属于自己的文件句柄,写入不同的文件,


4、清理工作,关于这个,需要知道2点:

1)main线程退出时,不会kill非守护线程,但是会kill守护线程


2)通常,子线程start()后会去调用run方法,运行完run方法,子线程停止执行,不会继续运行之后的代码。

所以,通常我们可以这么做,获取当前活动线程数,如果线程数为1,则说明子线程都运行完,可以继续后面的代码清理工作,否则继续循环检测,这里还可以加代码优化,比如每隔一段时间检测一次,以免主线程浪费系统资源


   # 利用主线程执行清理工作
   current_active_thread_num =len(threading.enumerate())# 获取当前活动线程数量
while  current_active_thread_num !=1:
   time.sleep(10) #10检测一次
   current_active_thread_num =len(threading.enumerate())

 

2.代码实践

requestpy.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-

__author__ ='shouke'

importurllib.request
importjson
importsys
importthreading
fromcollectionsimportCounter
importtime
importdatetime


classSubThread(threading.Thread):
   mutex_lock = threading.RLock()
   def__init__(self, file_handler):
       self.file_handler = file_handler
       self.run_times =0# 记录每个线程的运行次数
       threading.Thread.__init__(self)

   defrun(self):
       whileself.run_times <</span> int(sys.argv[2]):
           url ='http://xxxxxx/xxxxxcard/kq/codepool/test/'

           request = urllib.request.Request(url,method='POST')
           try:
               response = urllib.request.urlopen(request)
               response_body = response.read()
               response_body = response_body.decode('utf-8')
               response_body = json.loads(response_body)

                # 写入文件
               SubThread.mutex_lock.acquire()
              self.file_handler.write(str(response_body['code']))
              self.file_handler.write('\n')
              SubThread.mutex_lock.release()



               self.run_times =self.run_times +1# 记录每个线程的运行次数
               print('已经执行%s次请求'%str(self.run_times))
           exceptExceptionase:
               print('请求错误%s'% e)

defanalyze(test_result_data):
   list_data = []       # 存放目标数据
   total_line_count =0  # 读取的文本行数
   abnormal_line =0    # 存放异常数据行数
   digit_line =0       # 存放正确数据函数

   with  open(test_result_data,'r')asfile:
       line = file.readline()
       whileline:
           line = line.strip('\n')
           ifline.isdigit()andlen(line) ==12:
               list_data.append(int(line))
               digit_line = digit_line +1
           else:
               abnormal_line = abnormal_line +1
               print('服务器返回数据异常')

           line = file.readline()
           total_line_count = total_line_count +1


   print('读取的总行数:%s'%str(total_line_count))
   print('数据正确的行数:%s'%str(digit_line))
   print('数据异常的行数:%s'%str(abnormal_line))

   # 分析是否存在重复数据
   set_data =set(list_data)
   iflen(set_data) ==len(list_data):
       print('不存在重复数据,总数:%s'%len(list_data))
   else:
       print('有重复数据,重复数据:%s'% (len(list_data) -len(set_data)))

if__name__ =='__main__':
   start_time = datetime.datetime.now()

  test_result_data ='d:\\test_result_data.txt'
   file =  open(test_result_data,'w')  # 存储服务器返回数据

   threads_pool = []  # 线程池,存放线程对象
   thread_num =0  # 记录创建的线程数量

   whilethread_num <</span> int(sys.argv[1]):
       thread_obj = SubThread(file)
       threads_pool.append(thread_obj)
       thread_num = thread_num +1

   forthreadinthreads_pool:
       thread.start()

   # 利用主线程执行清理工作
   current_active_thread_num =len(threading.enumerate())# 获取当前活动线程数量
   while  current_active_thread_num !=1:
       time.sleep(10)
       current_active_thread_num =len(threading.enumerate())

 


   # 清理工作
   try:
       file.close()
   exceptExceptionase:
       print('关闭文件出错%s'% e)
 

end_time = datetime.datetime.now()
   print('运行耗时:',end_time - start_time)
 

# 分析数据
   analyze(test_result_data)

运行(禁用time.sleep函数的情况下):

100个线程,每个线程运行50次,总的运行5000

python requestpy.py10050

 

 

修改程序如下


class SubThread(threading.Thread):

   def __init__(self, file_handler):

       self.file_handler = file_handler

       self.run_times = 0 # 记录每个线程的运行次数
       threading.Thread.__init__(self)


   def run(self):

       while self.run_times < int(sys.argv[2]):

           url = 'http://xxxxxx/xxxxxcard/kq/codepool/test/'

           request = urllib.request.Request(url, method='POST')

           try:

               response = urllib.request.urlopen(request)

               response_body = response.read()

               response_body = response_body.decode('utf-8')

               response_body = json.loads(response_body)


               # 写入文件
               self.file_handler.write(str(response_body['code']))

               self.file_handler.write('\n')


               self.run_times = self.run_times + 1 # 记录每个线程的运行次数
               print('已经执行%s次请求' % str(self.run_times))

           except Exception as e:

               print('请求错误%s' % e)


def analyze(test_result_file_list):

   list_data = []       # 存放目标数据
   total_line_count = 0  # 读取的文本行数
   abnormal_line = 0    # 存放异常数据行数
   digit_line = 0       # 存放正确数据函数

   for file in test_result_file_list:

       with  open(file, 'r') as file:

           line = file.readline()

           while line:

               line = line.strip('\n')

               if line.isdigit() and len(line) == 12:

                   list_data.append(int(line))

                   digit_line = digit_line + 1

               else:

                   abnormal_line = abnormal_line + 1

                   print('服务器返回数据异常')


               line = file.readline()

               total_line_count = total_line_count + 1



   print('读取的总行数:%s' % str(total_line_count))

   print('数据正确的行数:%s' % str(digit_line))

   print('数据异常的行数:%s' % str(abnormal_line))


   # 分析是否存在重复数据
   set_data = set(list_data)

   if len(set_data) == len(list_data):

       print('不存在重复数据, 总数:%s 条' % len(list_data))

   else:

       print('有重复数据,重复数据:%s条' % (len(list_data) - len(set_data)))


   # 获取重复数据
   filehaneder = open('d:\\repeat_data.txt', 'w')

   c = Counter(list_data)

   for item in c.items():

       if item[1] > 1:

           print('重复数据:%s' % item[0])

           filehaneder.write(str(item[0]))

           filehaneder.write('\n')

   filehaneder.close()



if __name__ == '__main__':

   start_time = datetime.datetime.now()

   base_filename = 'test_result_data'
   base_dirname = 'd:\\result\\'
   test_result_file_list = [] # 存储结果数据文件名
   sub_thread_file_list = [] # 每个线程的文件句柄

   threads_pool = []  # 线程池,存放线程对象
   thread_num = 0  # 记录创建的线程数量

   while thread_num < int(sys.argv[1]):

       filename = base_dirname + base_filename + str(thread_num + 1) + '.txt'
       test_result_file_list.append(filename)

       file =  open(filename, 'w')

       sub_thread_file_list.append(file)


       thread_obj = SubThread(file)

       threads_pool.append(thread_obj)

       thread_num = thread_num + 1


   for thread in threads_pool:

       thread.start()


   # # 利用主线程执行清理工作
   current_active_thread_num = len(threading.enumerate()) # 获取当前活动线程数量
   while  current_active_thread_num != 1:

       time.sleep(300)
       current_active_thread_num = len(threading.enumerate())


   # 清理工作
   try:

       for file in sub_thread_file_list:

           file.close()

   except Exception as e:

       print('关闭文件出错%s' % e)

   end_time = datetime.datetime.now()

   print('运行耗时:',end_time - start_time)

   # 分析数据
   analyze(test_result_file_list)

运行结果:

目录
相关文章
|
4月前
|
人工智能 安全 调度
Python并发编程之线程同步详解
并发编程在Python中至关重要,线程同步确保多线程程序正确运行。本文详解线程同步机制,包括互斥锁、信号量、事件、条件变量和队列,探讨全局解释器锁(GIL)的影响及解决线程同步问题的最佳实践,如避免全局变量、使用线程安全数据结构、精细化锁的使用等。通过示例代码帮助开发者理解并提升多线程程序的性能与可靠性。
175 0
|
1月前
|
设计模式 缓存 安全
【JUC】(6)带你了解共享模型之 享元和不可变 模型并初步带你了解并发工具 线程池Pool,文章内还有饥饿问题、设计模式之工作线程的解决于实现
JUC专栏第六篇,本文带你了解两个共享模型:享元和不可变 模型,并初步带你了解并发工具 线程池Pool,文章中还有解决饥饿问题、设计模式之工作线程的实现
134 2
|
1月前
|
设计模式 消息中间件 安全
【JUC】(3)常见的设计模式概念分析与多把锁使用场景!!理解线程状态转换条件!带你深入JUC!!文章全程笔记干货!!
JUC专栏第三篇,带你继续深入JUC! 本篇文章涵盖内容:保护性暂停、生产者与消费者、Park&unPark、线程转换条件、多把锁情况分析、可重入锁、顺序控制 笔记共享!!文章全程干货!
150 1
|
27天前
|
Java 调度 数据库
Python threading模块:多线程编程的实战指南
本文深入讲解Python多线程编程,涵盖threading模块的核心用法:线程创建、生命周期、同步机制(锁、信号量、条件变量)、线程通信(队列)、守护线程与线程池应用。结合实战案例,如多线程下载器,帮助开发者提升程序并发性能,适用于I/O密集型任务处理。
201 0
|
2月前
|
数据采集 存储 弹性计算
高并发Java爬虫的瓶颈分析与动态线程优化方案
高并发Java爬虫的瓶颈分析与动态线程优化方案
|
3月前
|
数据采集 消息中间件 并行计算
Python多线程与多进程性能对比:从原理到实战的深度解析
在Python编程中,多线程与多进程是提升并发性能的关键手段。本文通过实验数据、代码示例和通俗比喻,深入解析两者在不同任务类型下的性能表现,帮助开发者科学选择并发策略,优化程序效率。
233 1
|
4月前
|
数据采集 监控 调度
干货分享“用 多线程 爬取数据”:单线程 + 协程的效率反超 3 倍,这才是 Python 异步的正确打开方式
在 Python 爬虫中,多线程因 GIL 和切换开销效率低下,而协程通过用户态调度实现高并发,大幅提升爬取效率。本文详解协程原理、实战对比多线程性能,并提供最佳实践,助你掌握异步爬虫核心技术。
|
4月前
|
数据采集 存储 Java
多线程Python爬虫:加速大规模学术文献采集
多线程Python爬虫:加速大规模学术文献采集
|
1月前
|
Java
如何在Java中进行多线程编程
Java多线程编程常用方式包括:继承Thread类、实现Runnable接口、Callable接口(可返回结果)及使用线程池。推荐线程池以提升性能,避免频繁创建线程。结合同步与通信机制,可有效管理并发任务。
139 6
|
4月前
|
Java API 微服务
为什么虚拟线程将改变Java并发编程?
为什么虚拟线程将改变Java并发编程?
296 83

推荐镜像

更多