Python 多线程并发程序设计与分析

简介: Python 多线程并发程序设计与分析

多线程并发程序设计与分析

 


1.技术难点分析与总结

难点1:线程运行时,运行顺序不固定


难点2:同一段代码,再不加锁的情况下,可能被多个线程同时执行,这会造成很多麻烦,比如变量的赋值不正确,方法的重复调用,而如果加锁,或者通过join阻塞方式等来控制,那么又如同运行单进程,效率低下,达不到,并发高速的效果。


难点3:不通过join阻塞等方式,主线程可能会优先于子线程退出,这也会导致问题,比如子线程还在用文件句柄,主线程就把文件关闭了。


解决方法:

1、考虑为线程类添加变量属性,这样一来,每个线程都拥有自己的变量,互不影响,比如下面例子中用到的run_times

 

2、线程公用的一些变量,也可以考虑通过线程类的变量属性传递,比如下面例子中多线程用到的文件句柄file_handler


3、必要时,关键代码可以考虑枷锁LockRLock具体自己看官方文档,比如下方的文件写入,不同线程可能会在同一行写入数据,导致数据统计时不准确,所以加锁,如果出于速度考虑,可以考虑分别给每个线程传递属于自己的文件句柄,写入不同的文件,


4、清理工作,关于这个,需要知道2点:

1)main线程退出时,不会kill非守护线程,但是会kill守护线程


2)通常,子线程start()后会去调用run方法,运行完run方法,子线程停止执行,不会继续运行之后的代码。

所以,通常我们可以这么做,获取当前活动线程数,如果线程数为1,则说明子线程都运行完,可以继续后面的代码清理工作,否则继续循环检测,这里还可以加代码优化,比如每隔一段时间检测一次,以免主线程浪费系统资源


   # 利用主线程执行清理工作
   current_active_thread_num =len(threading.enumerate())# 获取当前活动线程数量
while  current_active_thread_num !=1:
   time.sleep(10) #10检测一次
   current_active_thread_num =len(threading.enumerate())

 

2.代码实践

requestpy.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-

__author__ ='shouke'

importurllib.request
importjson
importsys
importthreading
fromcollectionsimportCounter
importtime
importdatetime


classSubThread(threading.Thread):
   mutex_lock = threading.RLock()
   def__init__(self, file_handler):
       self.file_handler = file_handler
       self.run_times =0# 记录每个线程的运行次数
       threading.Thread.__init__(self)

   defrun(self):
       whileself.run_times <</span> int(sys.argv[2]):
           url ='http://xxxxxx/xxxxxcard/kq/codepool/test/'

           request = urllib.request.Request(url,method='POST')
           try:
               response = urllib.request.urlopen(request)
               response_body = response.read()
               response_body = response_body.decode('utf-8')
               response_body = json.loads(response_body)

                # 写入文件
               SubThread.mutex_lock.acquire()
              self.file_handler.write(str(response_body['code']))
              self.file_handler.write('\n')
              SubThread.mutex_lock.release()



               self.run_times =self.run_times +1# 记录每个线程的运行次数
               print('已经执行%s次请求'%str(self.run_times))
           exceptExceptionase:
               print('请求错误%s'% e)

defanalyze(test_result_data):
   list_data = []       # 存放目标数据
   total_line_count =0  # 读取的文本行数
   abnormal_line =0    # 存放异常数据行数
   digit_line =0       # 存放正确数据函数

   with  open(test_result_data,'r')asfile:
       line = file.readline()
       whileline:
           line = line.strip('\n')
           ifline.isdigit()andlen(line) ==12:
               list_data.append(int(line))
               digit_line = digit_line +1
           else:
               abnormal_line = abnormal_line +1
               print('服务器返回数据异常')

           line = file.readline()
           total_line_count = total_line_count +1


   print('读取的总行数:%s'%str(total_line_count))
   print('数据正确的行数:%s'%str(digit_line))
   print('数据异常的行数:%s'%str(abnormal_line))

   # 分析是否存在重复数据
   set_data =set(list_data)
   iflen(set_data) ==len(list_data):
       print('不存在重复数据,总数:%s'%len(list_data))
   else:
       print('有重复数据,重复数据:%s'% (len(list_data) -len(set_data)))

if__name__ =='__main__':
   start_time = datetime.datetime.now()

  test_result_data ='d:\\test_result_data.txt'
   file =  open(test_result_data,'w')  # 存储服务器返回数据

   threads_pool = []  # 线程池,存放线程对象
   thread_num =0  # 记录创建的线程数量

   whilethread_num <</span> int(sys.argv[1]):
       thread_obj = SubThread(file)
       threads_pool.append(thread_obj)
       thread_num = thread_num +1

   forthreadinthreads_pool:
       thread.start()

   # 利用主线程执行清理工作
   current_active_thread_num =len(threading.enumerate())# 获取当前活动线程数量
   while  current_active_thread_num !=1:
       time.sleep(10)
       current_active_thread_num =len(threading.enumerate())

 


   # 清理工作
   try:
       file.close()
   exceptExceptionase:
       print('关闭文件出错%s'% e)
 

end_time = datetime.datetime.now()
   print('运行耗时:',end_time - start_time)
 

# 分析数据
   analyze(test_result_data)

运行(禁用time.sleep函数的情况下):

100个线程,每个线程运行50次,总的运行5000

python requestpy.py10050

 

 

修改程序如下


class SubThread(threading.Thread):

   def __init__(self, file_handler):

       self.file_handler = file_handler

       self.run_times = 0 # 记录每个线程的运行次数
       threading.Thread.__init__(self)


   def run(self):

       while self.run_times < int(sys.argv[2]):

           url = 'http://xxxxxx/xxxxxcard/kq/codepool/test/'

           request = urllib.request.Request(url, method='POST')

           try:

               response = urllib.request.urlopen(request)

               response_body = response.read()

               response_body = response_body.decode('utf-8')

               response_body = json.loads(response_body)


               # 写入文件
               self.file_handler.write(str(response_body['code']))

               self.file_handler.write('\n')


               self.run_times = self.run_times + 1 # 记录每个线程的运行次数
               print('已经执行%s次请求' % str(self.run_times))

           except Exception as e:

               print('请求错误%s' % e)


def analyze(test_result_file_list):

   list_data = []       # 存放目标数据
   total_line_count = 0  # 读取的文本行数
   abnormal_line = 0    # 存放异常数据行数
   digit_line = 0       # 存放正确数据函数

   for file in test_result_file_list:

       with  open(file, 'r') as file:

           line = file.readline()

           while line:

               line = line.strip('\n')

               if line.isdigit() and len(line) == 12:

                   list_data.append(int(line))

                   digit_line = digit_line + 1

               else:

                   abnormal_line = abnormal_line + 1

                   print('服务器返回数据异常')


               line = file.readline()

               total_line_count = total_line_count + 1



   print('读取的总行数:%s' % str(total_line_count))

   print('数据正确的行数:%s' % str(digit_line))

   print('数据异常的行数:%s' % str(abnormal_line))


   # 分析是否存在重复数据
   set_data = set(list_data)

   if len(set_data) == len(list_data):

       print('不存在重复数据, 总数:%s 条' % len(list_data))

   else:

       print('有重复数据,重复数据:%s条' % (len(list_data) - len(set_data)))


   # 获取重复数据
   filehaneder = open('d:\\repeat_data.txt', 'w')

   c = Counter(list_data)

   for item in c.items():

       if item[1] > 1:

           print('重复数据:%s' % item[0])

           filehaneder.write(str(item[0]))

           filehaneder.write('\n')

   filehaneder.close()



if __name__ == '__main__':

   start_time = datetime.datetime.now()

   base_filename = 'test_result_data'
   base_dirname = 'd:\\result\\'
   test_result_file_list = [] # 存储结果数据文件名
   sub_thread_file_list = [] # 每个线程的文件句柄

   threads_pool = []  # 线程池,存放线程对象
   thread_num = 0  # 记录创建的线程数量

   while thread_num < int(sys.argv[1]):

       filename = base_dirname + base_filename + str(thread_num + 1) + '.txt'
       test_result_file_list.append(filename)

       file =  open(filename, 'w')

       sub_thread_file_list.append(file)


       thread_obj = SubThread(file)

       threads_pool.append(thread_obj)

       thread_num = thread_num + 1


   for thread in threads_pool:

       thread.start()


   # # 利用主线程执行清理工作
   current_active_thread_num = len(threading.enumerate()) # 获取当前活动线程数量
   while  current_active_thread_num != 1:

       time.sleep(300)
       current_active_thread_num = len(threading.enumerate())


   # 清理工作
   try:

       for file in sub_thread_file_list:

           file.close()

   except Exception as e:

       print('关闭文件出错%s' % e)

   end_time = datetime.datetime.now()

   print('运行耗时:',end_time - start_time)

   # 分析数据
   analyze(test_result_file_list)

运行结果:

目录
相关文章
|
2天前
|
数据采集 存储 Java
高德地图爬虫实践:Java多线程并发处理策略
高德地图爬虫实践:Java多线程并发处理策略
|
5天前
|
机器学习/深度学习 算法 数据挖掘
PYTHON银行机器学习:回归、随机森林、KNN近邻、决策树、高斯朴素贝叶斯、支持向量机SVM分析营销活动数据|数据分享-2
PYTHON银行机器学习:回归、随机森林、KNN近邻、决策树、高斯朴素贝叶斯、支持向量机SVM分析营销活动数据|数据分享
28 1
|
1天前
|
新零售 分布式计算 数据可视化
数据分享|基于Python、Hadoop零售交易数据的Spark数据处理与Echarts可视化分析
数据分享|基于Python、Hadoop零售交易数据的Spark数据处理与Echarts可视化分析
|
2天前
|
人工智能 安全 Java
Python 多线程编程实战:threading 模块的最佳实践
Python 多线程编程实战:threading 模块的最佳实践
13 5
|
4天前
|
Java 数据库连接 数据处理
Python从入门到精通:3.1.2多线程与多进程编程
Python从入门到精通:3.1.2多线程与多进程编程
|
4天前
|
机器学习/深度学习 算法 vr&ar
PYTHON用时变马尔可夫区制转换(MARKOV REGIME SWITCHING)自回归模型分析经济时间序列
PYTHON用时变马尔可夫区制转换(MARKOV REGIME SWITCHING)自回归模型分析经济时间序列
16 4
|
4天前
|
机器学习/深度学习 算法 Python
数据分享|Python决策树、随机森林、朴素贝叶斯、KNN(K-最近邻居)分类分析银行拉新活动挖掘潜在贷款客户
数据分享|Python决策树、随机森林、朴素贝叶斯、KNN(K-最近邻居)分类分析银行拉新活动挖掘潜在贷款客户
23 4
|
4天前
|
API vr&ar Python
Python 用ARIMA、GARCH模型预测分析股票市场收益率时间序列(上)
Python 用ARIMA、GARCH模型预测分析股票市场收益率时间序列
32 5
|
4天前
|
监控 Python
Python监控主机是否存活,并发报警邮件
Python监控主机是否存活,并发报警邮件
|
9天前
|
安全 Java
深入理解 Java 多线程和并发工具类
【4月更文挑战第19天】本文探讨了Java多线程和并发工具类在实现高性能应用程序中的关键作用。通过继承`Thread`或实现`Runnable`创建线程,利用`Executors`管理线程池,以及使用`Semaphore`、`CountDownLatch`和`CyclicBarrier`进行线程同步。保证线程安全、实现线程协作和性能调优(如设置线程池大小、避免不必要同步)是重要环节。理解并恰当运用这些工具能提升程序效率和可靠性。

热门文章

最新文章