分布式单词发音抓取机器人

简介:

摘要

    网络编程实验课程要求必须写一个套接字的应用程序,考虑到之前写过的单词发音抓取程序的效率比较低下,就顺便结合套接字做一个分布式的抓取软件。其中涉及到动态任务领取,负载均衡,多线程,加锁解锁,简单的HTML代码解析,文件读写等功能。程序还是使用Python完成,对于学习Python、套接字编程、分布式编程甚至集群编程都有一定的意义。

    另外,此软件具有一定的攻击性,如果启动的从节点数量过多,并且每个节点上启动的线程数量过大,那么提供单词发音的服务器可能承受不住压力。所以,此软件对于编写web服务器的负载压力测试有一定的参考意义。

        下面是系统结构图:

         

Chaos Lee

                                                   图1 系统结构图

         

 

1为系统结构图,整个系统由主节点和从节点两大部分构成,主节点上有四种不同类型的线程,分别为:

l  任务侦听线程:负责接受从节点上的任务请求线程,需要注意的是,任务侦听线程并不负责和请求线程通信,而是启动一个新的单词分发线程负责和请求线程通信。

l  单词分发线程池:为从节点上的请求线程服务的主节点上的线程的集合。

l  文件侦听线程:负责接受从节点上的数据回传请求,需要注意的是,文件侦听线程并不负责和请求线程通信,而是启动一个新的写磁盘线程和请求线程通信。

l  写磁盘线程:为从节点上的请求线程服务的主节点上的线程的集合。

客户端也是使用并发的方式实现,一共有以下三类线程:

l  请求线程:当单词队列小于一定的阈值时,请求线程负责向主节点的任务侦听线程请求任务。

l  下载线程池,负责现在单词音频文件的线程的集合

l  文件回传线程:负责将写入本地磁盘上的文件回传到主节点上的线程

另外,系统中还有几个关键的数据结构:

l  主节点上的全部单词列表,其中存储了所有的需要下载发音文件对应的单词,该数据结构是一个队列。

l  从节点上的部分单词列表,其中存储了从节点上需要下载的部分单词,该数据结构是一个队列。

l  从节点上的已下载的mp3文件名称列表,其中存储了已经下载的mp3文件的路径,该数据结构为一个队列。 

       下面为具体的编码实现,仅为核心代码没有贴所有代码。如果有需要可以给我发邮件索取~

任务侦听线程


 
 
  1. class WordDispatchThread(threading.Thread): 
  2.  
  3.     def __init__(self,wordsList): 
  4.         threading.Thread.__init__(self) 
  5.         self.wordSocket = socket(AF_INET,SOCK_STREAM) 
  6.         self.wordSocket.bind(('',wordSocketPort)) 
  7.         self.wordSocket.listen(maxConnectNum) 
  8.         self.wordsList = wordsList 
  9.      
  10.     def run(self): 
  11.         while True
  12.             clientSock,address = self.wordSocket.accept() 
  13.             print 'connect from %s is established.' % str(address) 
  14.             wordWorker = WordServiceThread(clientSock,self.wordsList) 
  15.             wordWorker.start() 

     说明:在该类的构造函数中,将创建侦听套接字,并将全部单词列表作为类的参数成员。线程运行为一个死循环,其中启动accept方法,收到任务请求线程的请求之后,将启动任务分发线程为其服务。

           该线程的状态转换图如图2所示:

 

李超

   图2 任务侦听线程状态转换图

文件侦听线程


 
 
  1. class FileStoreThread(threading.Thread): 
  2.  
  3.     def __init__(self,wordsDownloaded): 
  4.         threading.Thread.__init__(self) 
  5.         self.fileSocket = socket(AF_INET,SOCK_STREAM) 
  6.         self.fileSocket.bind(('',fileSocketPort)) 
  7.         self.fileSocket.listen(maxConnectNum) 
  8.         self.wordsList = wordsDownloaded 
  9.  
  10.     def run(self): 
  11.         while True
  12.             clientSock,address = self.fileSocket.accept() 
  13.             print 'connect from %s is established.' % str(address) 
  14.             fileWorker = FileServiceThread(clientSock,self.wordsList) 
  15.             fileWorker.start() 
 说明:在该类的构造函数中,将创建侦听套接字,并将已下载的单词列表作为类的参数成员。线程运行为一个死循环,其中启动accept方法,收到任务请求线程的请求之后,将启动写磁盘线程为其服务。
  文件侦听线程状态转换图如图3所示:
   

李超

 图3 文件侦听线程的状态转换图

任务分发线程


 
 
  1. class WordServiceThread(threading.Thread): 
  2.      
  3.     def __init__(self,clientSocket,wordsList):   
  4.         threading.Thread.__init__(self) 
  5.         self.clientSocket = clientSocket 
  6.         self.wordsList = wordsList 
  7.  
  8.     def run(self): 
  9.         global wordsListMutex 
  10.         while True
  11.             request = self.clientSocket.recv(maxWordLength) 
  12.             requestStr = request.decode('ascii'
  13.             if(requestStr == 'please'): 
  14.                 tmpWordsList = '' 
  15.                 wordsListMutex.acquire() 
  16.                 for i in range(0,taskGrain): 
  17.                     tmpWord = self.wordsList.front() 
  18.                     tmpWordsList = tmpWordsList + ' ' + tmpWord 
  19.                 wordsListMutex.release() 
  20.                 self.clientSocket.send(tmpWordsList) 
  21.             else
  22.                 self.clientSocket.close() 
  23.                 break 

 说明:在该类的构造函数中得到为请求线程服务的套接字,以及全部单词列表的队列。在run中执行任务发送协议。首先受到一个请求字符串,应该为‘please’,否则表示应该在服务器端关闭为其服务的套接字。

 收到please之后,从所有单词队列中取出一定数量的单词拼成一个字符串发给送从节点的请求线程。

 该线程的执行逻辑对应的流程图如图4所示:

 

李超

图4 任务分发线程

写磁盘线程


 
 
  1. class FileServiceThread(threading.Thread): 
  2.     global soundQueueMutex 
  3.     def __init__(self,clientSocket,wordsDownloaded): 
  4.         threading.Thread.__init__(self) 
  5.         self.socket = clientSocket 
  6.         self.wordsDownloaded = wordsDownloaded 
  7.  
  8.     def run(self): 
  9.         while True
  10.             rawSaveName = self.socket.recv(maxWordLength)        
  11.             word = rawSaveName.decode('ascii'
  12.             if(word == ''): 
  13.                 continue 
  14.             if(word == '0'): 
  15.                 break 
  16.             saveName = word + '.mp3' 
  17.             outfd = open(saveName,'wb')  
  18.             helloPacket = 'OK' 
  19.             helloPacketStr = helloPacket.encode('ascii'
  20.             self.socket.send(helloPacketStr) 
  21.             while True
  22.                 data = self.socket.recv(dataBlockSize) 
  23.                 outfd.write(data) 
  24.                 query = 'any more?' 
  25.                 self.socket.send(query.encode('ascii')) 
  26.                 answer = self.socket.recv(maxWordLength) 
  27.                 answer = answer.decode('ascii'
  28.                 if(answer == 'yes'): 
  29.                     continue 
  30.                 else
  31.                     break 
  32.             print 'received %s successfully...' % saveName 
  33.             outfd.close() 
 说明:在构造函数中获取到为客户端文件回写线程服务的套接字,通过该套接字执行数据回写协议,将数据写回到磁盘上。
 写磁盘线程的执行逻辑对应的流程图如图图5所示:
 

图5 写磁盘线程

任务请求线程


 
 
  1. class TaskRequestThread(threading.Thread): 
  2.     global wordQueueMutex 
  3.  
  4.     def __init__(self,wordQueue): 
  5.         threading.Thread.__init__(self) 
  6.         self.requestSocket = socket(AF_INET,SOCK_STREAM) 
  7.         self.taskQueue = wordQueue 
  8.         self.requestSocket.connect(('localhost',wordSocketPort)) 
  9.  
  10.     def run(self): 
  11.         while True
  12.             wordQueueMutex.acquire() 
  13.             if(len(self.taskQueue) < downloadWokerNum ): 
  14.                 requestStr = 'please' 
  15.                 self.requestSocket.send(requestStr.encode('ascii')) 
  16.                 rawStr = self.requestSocket.recv(rawStrLen)     #group 10 words once 
  17.                 wordsStr = rawStr.decode('ascii'
  18.                 wordsList = wordsStr.split() 
  19.                 for word in wordsList: 
  20.                     self.taskQueue.append(word) 
  21.             exit = True 
  22.             for word in self.taskQueue: 
  23.                 if word != '0'
  24.                     exit = False 
  25.             if(exit == True): 
  26.                 self.requestSocket.send('0'
  27.                 self.requestSocket.close() 
  28.                 wordQueueMutex.release() 
  29.                 break 
  30.             wordQueueMutex.release() 
  31.             time.sleep(taskRequestWorkerSleep) 
 
说明:请求线程定期扫描任务队列中任务的数量,如果小于一定的阈值,则从主节点上请求新的单词列表任务。否则,线程进入睡眠。如果当前队列中的所有单词都为‘0’,说明主节点上已经没有单词任务了,这时可以退出了。
任务请求线程对应的执行逻辑如图6所示:
 

图6 任务请求线程

下载线程


 
 
  1. class DownloadThread(threading.Thread): 
  2.     global wordQueueMutex 
  3.     global fileQueueMutex 
  4.  
  5.     def __init__(self,wordQueue,fileQueue): 
  6.         threading.Thread.__init__(self) 
  7.         self.wordQueue = wordQueue 
  8.         self.fileQueue = fileQueue 
  9.      
  10.     def run(self): 
  11.         while True
  12.             time.sleep(2)           #delete this when presenting a demostration 
  13.             word = '' 
  14.             wordQueueMutex.acquire()             
  15.             while (len(self.wordQueue) == 0): 
  16.                 wordQueueMutex.release() 
  17.                 time.sleep(downloadWorkerSleep) 
  18.                 wordQueueMutex.acquire() 
  19.             if(self.wordQueue[0]=='0'): 
  20.                 wordQueueMutex.release() 
  21.                 fileQueueMutex.acquire() 
  22.                 self.fileQueue.append('0'
  23.                 fileQueueMutex.release() 
  24.                 break; 
  25.             else
  26.                 word = self.wordQueue.pop(0) 
  27.                 wordQueueMutex.release() 
  28.                 url = "http://www.dwds.de/?qu="+word 
  29.                 urlContent = urllib2.urlopen(url).read() 
  30.                 #print urlContent 
  31.                 urlList = re.findall('filename=http://media.dwds.de/dwds/media/sound/dwdswb_aussprache/.*\.mp3', urlContent) 
  32.                 try: 
  33.                     finalUrl = urlList[0][9:] 
  34.                     #print finalUrl 
  35.                     soundData = urllib2.urlopen(finalUrl).read()  
  36.                                 saveName=word+'.mp3'+'.local'  
  37.                     #print saveName 
  38.                                 outfd = open(saveName,'wb')  
  39.                                 outfd.write(soundData)  
  40.                                 outfd.close() 
  41.                     fileQueueMutex.acquire() 
  42.                     self.fileQueue.append(word)                  
  43.                     fileQueueMutex.release() 
  44.                     print '%s:              OK' % word 
  45.                 except
  46.                     print '%s:              FAILED' % word 
  47.                                 finally: 
  48.                     pass 

 说明:下载线程在构造函数中就获得了任务列表队列和已下载文件队列,其处理过程前面已经叙述过了。首先从当前任务队列中取出一个单词,如果单词为’0’,表示已经没有任务了,这时线程退出。否则,就需要去构建单词页面的URL,然后分析页面的HTML代码,使用正则表达式找到单词音频文件的URL,接着将数据读入内存并写入磁盘。另外注意的是,下载线程在退出的时候会给已下载单词队列中写入’0’,以通知回传线程退出。

下载线程的执行逻辑对应的流程图如图7所示:

 

李超

图7 下载线程

文件回传线程


 
 
  1. class FileTransferThread(threading.Thread): 
  2.     global fileQueueMutex 
  3.  
  4.     def __init__(self,fileQueue): 
  5.         threading.Thread.__init__(self) 
  6.         self.fileQueue = fileQueue; 
  7.         self.fileSocket = socket(AF_INET,SOCK_STREAM) 
  8.         self.fileSocket.connect(('localhost',fileSocketPort)) 
  9.         self.exitCounter = 0 
  10.  
  11.     def run(self): 
  12.         while True
  13.             fileQueueMutex.acquire() 
  14.             while(len(self.fileQueue)==0): 
  15.                 fileQueueMutex.release() 
  16.                 time.sleep(fileWorkerSleep) 
  17.                 fileQueueMutex.acquire() 
  18.             word = self.fileQueue.pop(0) 
  19.             fileQueueMutex.release() 
  20.             if(word == '0'): 
  21.                 self.exitCounter = self.exitCounter + 1 
  22.                 if (self.exitCounter == downloadWokerNum): 
  23.                     self.fileSocket.send(word.encode('ascii')) 
  24.                     self.fileSocket.close() 
  25.                     break 
  26.                 else
  27.                     continue 
  28.             self.fileSocket.send(word.encode('ascii')) 
  29.             response = self.fileSocket.recv(helloLength) 
  30.             responseStr = response.decode('ascii'
  31.             if(responseStr != 'OK'): 
  32.                 self.fileSocket.close() 
  33.                 continue 
  34.             saveName = word + '.mp3' + '.local' 
  35.             infd = open(saveName,'rb'
  36.             data = infd.read(dataBlockSize) 
  37.             while True:              
  38.                 self.fileSocket.send(data) 
  39.                 query = self.fileSocket.recv(maxWordLength) 
  40.                 answer = '' 
  41.                 data = infd.read(dataBlockSize)              
  42.                 if not data: 
  43.                     answer = 'no' 
  44.                     self.fileSocket.send(answer.encode('ascii')) 
  45.                     break 
  46.                 else
  47.                     answer = 'yes' 
  48.                     self.fileSocket.send(answer.encode('ascii')) 
  49.             infd.close() 
 
说明:回传线程在构造函数中获得了已下载的单词的队列。在运行的过程中,首先判断当前的已下载单词队列中是否有文件名,如果有则立即回传数据。下载线程在退出的时候会给已下载单词队列中写入’0’,以通知回传线程退出。回传线程会统计获得的0的数量,如果统计的数量等于下载线程的数量,表示下载线程全部退出,同时文件已经回传完毕。这时,回传线程也可以退出了。
文件回传线程的执行逻辑对应的流程图如图8所示:
 

李超

图8 文件回传线程

单词列表类


 
 
  1. class WordsList: 
  2.  
  3.     def __init__(self): 
  4.         if(len(sys.argv)<2): 
  5.             print 'Usage: %s filename' % sys.argv[0] 
  6.             sys.exit(-1)     
  7.         filePath = sys.argv[1] 
  8.         self.t = [] 
  9.         try: 
  10.             for line in fileinput.input(filePath): 
  11.                 wordLen = len(line) 
  12.                 if( wordLen > 1  and  line[wordLen-1] == '\n'): 
  13.                     word = line[0:wordLen-1] 
  14.                     self.t.append(word) 
  15.                 else
  16.                     self.t.append(line) 
  17.             self.t.append('0'
  18.         except
  19.             print 'constructing words list error.' 
  20.             print 'maybe the provided file path is wrong.Check it twice.' 
  21.             sys.exit(-2) 
  22.         finally: 
  23.             pass 
  24.         print 'constructing words successfully...' 
  25.              
  26.     def front(self): 
  27.         if(self.t[0]!='0'): 
  28.             return self.t.pop(0) 
  29.         else
  30.             return self.t[0] 
说明:该类在构造函数中,从命令行上提供的文件名中解析出所有的单词,并存放在一个列表中,在最后加入一个’0’表示已经没有单词了。此类中还提供一个方法--front,用于从单词队列中获取队首的单词,如果没有单词的话,返回为’0’。这一点非常重要,用于控制从节点线程的结束。

总结

今年一月份的时候,准备背诵德语四级单词,但是很多单词发音记不清楚,所以找到了一个单词发音网站。当时,想将这些音频文件存放在我的mp3播放器中,这样我可以随时随地的背诵记忆了,所以用Python写了一个简单的单词发音抓取程序,但是抓取效率不是很高,速度比较慢。而我的研究方向为高性能计算,现在主要是集群计算,所以结合网络课程上要求的套接字网络编程实验和我的研究方向,完成了此软件的开发。

该软件的目标很明确,就是以服务器能否提供的最大负载来下载单词对应的音频文件。此软件有一定的攻击性,如果这个软件可以通过因特网传播,并在后台隐秘的运行,则完全可以使提供单词下载的网站瘫痪。

在实际编程中,遇到了一些比较棘手的问题。特别是文件传输过程,虽然可以使用ftp协议传输,但是为了保持软件的最小依赖性,还是实现了一个简单的文件传输协议。因为Karn算法的原因,有些字节被缓冲在缓存中导致接受过程混乱等问题都是之前没有遇到过的。这些问题通过自定义的一些简单的协议得以解决。

之前还想添加一个语音提示的功能,在某个单词下载完毕之后,语音提示该单词已经下载完成。但考虑到多节点下载时单词发音反而会成为瓶颈,所以放弃了这个功能的实现。

此软件仅仅作为一个演示软件,并没有在实际的集群上运行过。软件的实际运行环境还是做了很多的假设,并不能保证深度测试的时候不出现故障。

关于软件的并发,由于希望此软件能够运行在集群上,所以就必须考虑负载均衡的问题。这里使用主从模式,服务器作为主节点,其他节点作为从节点。为了做到负载均衡,使用动态任务分配的并行模式,每个从节点在没有任务的时候向主节点索取,而不是被动的从主节点接受。动态任务分配的模式可以保证各个节点都处于繁忙状态,最小化负载不均带来的问题。

 

 

 

 

本文转自hipercomer 51CTO博客,原文链接:http://blog.51cto.com/hipercomer/874996


相关文章
|
30天前
|
机器学习/深度学习 人工智能 自然语言处理
PeterCat:一键创建开源项目 AI 问答机器人,自动抓取 GitHub 仓库信息、文档和 issue 等构建知识库
PeterCat 是一款开源的智能答疑机器人,能够自动抓取 GitHub 上的文档和 issue 构建知识库,提供对话式答疑服务,帮助开发者和社区维护者高效解决技术问题。
128 7
PeterCat:一键创建开源项目 AI 问答机器人,自动抓取 GitHub 仓库信息、文档和 issue 等构建知识库
|
6月前
|
Web App开发 机器人
小白一学就会的 小红书全自动写文发文机器人-抓取爆款笔记(一)
小白一学就会的 小红书全自动写文发文机器人-抓取爆款笔记(一)
226 4
|
8月前
|
机器学习/深度学习 算法 机器人
论文介绍:使用仿真和领域适应提高深度机器人抓取效率
【5月更文挑战第11天】研究人员提出结合仿真数据和领域适应技术提升深度机器人抓取效率。通过在仿真环境中生成多样化抓取数据并使用GraspGAN和DANN进行像素级和特征级适应,使模型能在现实世界中更好地泛化。实验表明,这种方法能减少现实数据需求,同时保持高抓取性能。尽管面临物理差异和成功率挑战,该研究为机器人抓取技术的进步提供了新途径。论文链接:https://arxiv.org/abs/1709.07857
97 5
|
机器人 计算机视觉
首次机器人抓取云竞赛引国际学界广泛关注和参与
近日,阿里巴巴达摩院人工智能实验室与University of South Florida等国外著名研究机构共同举办了世界首次机器人抓取云竞赛:OCRTOC竞赛。OCRTOC竞赛聚焦于机器人抓取能力以及桌面物品整理的应用场景,旨在成为机器人抓取技术领域的ImageNet。OCRTOC竞赛获得了国际电气电子工程师协会两大技术委员会的大力支持,并成为国际机器人顶会IROS2020的正式官方赛事,吸引了全球顶尖学府的关注!
391 0
首次机器人抓取云竞赛引国际学界广泛关注和参与
|
数据采集 供应链 机器人
Python - 抓取 iphone13 pro 线下店供货信息并发送到钉钉机器人,最后设置为定时任务
Python - 抓取 iphone13 pro 线下店供货信息并发送到钉钉机器人,最后设置为定时任务
463 0
Python - 抓取 iphone13 pro 线下店供货信息并发送到钉钉机器人,最后设置为定时任务
|
算法 机器人 开发工具
机器人真·涨姿势了:比肩人类抓取能力,上海交大、非夕科技联合提出全新方法AnyGrasp
在近日召开的 ICRA (国际机器人与自动化会议)大会上,上海交大-非夕科技联合实验室展示了最新研究成果「AnyGrasp」(https://graspnet.net/anygrasp.html),第一次实现机器人对于任意场景的任意物体的通用高速抓取,在机械臂硬件构型、相机不作限制的情况下,让机器人拥有比肩人类抓取能力的可能。
841 0
机器人真·涨姿势了:比肩人类抓取能力,上海交大、非夕科技联合提出全新方法AnyGrasp
|
数据采集 前端开发 搜索推荐
20、 Python快速开发分布式搜索引擎Scrapy精讲—编写spiders爬虫文件循环抓取内容—meta属性返回指定值给回调函数—Scrapy内置图片下载器
编写spiders爬虫文件循环抓取内容 Request()方法,将指定的url地址添加到下载器下载页面,两个必须参数,  参数:  url='url'  callback=页面处理函数  使用时需要yield Request() parse.
1329 0
|
存储 前端开发 NoSQL
Python分布式抓取和分析京东商城评价
互联网购物现在已经是非常普遍的购物方式,在互联网上购买商品并且使用之后,很多人都会回过头来对自己购买的商品进行一些评价,以此来表达自己对于该商品使用后的看法。商品评价的好坏对于一个商品的重要性显而易见,大部分消费者都以此作为快速评判该商品质量优劣的方式。
1540 0
|
Web App开发 数据采集 NoSQL
Scrapy-Redis分布式抓取麦田二手房租房信息与数据分析
试着通过抓取一家房产公司的全部信息,研究下北京的房价。文章最后用Pandas进行了分析,并给出了数据可视化。 准备工作 麦田房产二手房页面(http://bj.maitian.cn/esfall/PG1)。
1297 0

热门文章

最新文章