往期文章:
多组件的Pipeline技术结构
复杂的事情一般不会一下子做完,而是会分成很多中间步骤一步步完成。
生产者消费者爬虫的架构
多线程数据通信的queue.Queue
在实现生产者消费者模式之前,了解一下多线程数据通信queue。queue.Queue可以用于多线程之间的,线程安全的数据通信。
#1、导入类库
import queue
# 2、创建Queue
q = queue.queue()
#3、添加元素
q.put(item)
#4、获取元素
item = q.get(item)
#5、查询状态
#查询元素多少
q.qsize()
#判断是否为空
q.empty()
#判断是否已满
q.full()
代码实现
假如现在要实现这样的一个需求,现在要提前博客园的文章的标题和链接,这时,需要在cnblogs_spider
添加一个函数parse
import requests
from bs4 import BeautifulSoup
urls = [
"https://www.cnblogs.com/#p{}".format(page)
for page in range(1,51)
]
def craw(url):
r =requests.get(url)
return r.text
def parse(html):
soup = BeautifulSoup(html,"html.parser")
links = soup.find_all('a',class_="post-item-title")
return [ (link['href'],link.get_text()) for link in links]
新建一个文件名为producer_consumer_spider.py
,在这个文件中新增两个函数,内容如下:
import queue
import cnblogs_spider
from loguru import logger
import threading
# 生产者生产任务
def do_craw(url_queue:queue.Queue,html_queue:queue.Queue):
while True:
url = url_queue.get()
html = cnblogs_spider.craw(url)
logger.info("生产者:{},爬取的连接是:{},url_queue.size={}".format(
threading.current_thread().name,
url,
url_queue.qsize()
))
html_queue.put(html)
# 消费者消费任务
def do_parse(html_queue:queue.Queue,fout):
while True:
html = html_queue.get()
results = cnblogs_spider.parse(html)
for item in results:
logger.info("item是:{}".format(item))
fout.write(str(item) + '\n')
logger.info("消费者:{},results.size:{},html_queue.size={}".format(
threading.current_thread().name,
len(results),
html_queue.qsize()
))
if __name__ == "__main__":
url_queue = queue.Queue()
html_queue = queue.Queue()
for url in cnblogs_spider.urls:
url_queue.put(url)
# 生产者开启3个线程
for id in range(3):
task = threading.Thread(target=do_craw,args=(url_queue,html_queue),name="craw{}".format(id))
task.start()
# 消费者开启2个线程
# 把消费的任务写到文件中
fout = open("./02.data.txt","w")
for id in range(2):
task = threading.Thread(target=do_parse,args=(html_queue,fout),name="parse{}".format(id))
task.start()