一个使用 asyncio 协程的网络爬虫(三)

本文涉及的产品
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
简介:

使用协程

我们将从描述爬虫如何工作开始。现在是时候用 asynio 去实现它了。

我们的爬虫从获取第一个网页开始,解析出链接并把它们加到队列中。此后它开始傲游整个网站,并发地获取网页。但是由于客户端和服务端的负载限制,我们希望有一个最大数目的运行的 worker,不能再多。任何时候一个 worker 完成一个网页的获取,它应该立即从队列中取出下一个链接。我们会遇到没有那么多事干的时候,所以一些 worker 必须能够暂停。一旦又有 worker 获取一个有很多链接的网页,队列会突增,暂停的 worker 立马被唤醒干活。最后,当任务完成后我们的程序必须马上退出。

假如你的 worker 是线程,怎样去描述你的爬虫算法?我们可以使用 Python 标准库中的同步队列。每次有新的一项加入,队列增加它的 “tasks” 计数器。线程 worker 完成一个任务后调用 task_done。主线程阻塞在Queue.join,直到“tasks”计数器与 task_done 调用次数相匹配,然后退出。

协程通过 asyncio 队列,使用和线程一样的模式来实现!首先我们导入它


 
 
  1. try:
  2. from asyncio import JoinableQueue as Queue
  3. except ImportError:
  4. # In Python 3.5, asyncio.JoinableQueue is
  5. # merged into Queue.
  6. from asyncio import Queue

我们把 worker 的共享状态收集在一个 crawler 类中,主要的逻辑写在 crawl 方法中。我们在一个协程中启动crawl,运行 asyncio 的事件循环直到 crawl 完成:


 
 
  1. loop = asyncio.get_event_loop()
  2. crawler = crawling.Crawler('http://xkcd.com',
  3. max_redirect=10)
  4. loop.run_until_complete(crawler.crawl())

crawler 用一个根 URL 和最大重定向数 max_redirect 来初始化,它把 (URL, max_redirect) 序对放入队列中。(为什么要这样做,请看下文)


 
 
  1. class Crawler:
  2. def __init__(self, root_url, max_redirect):
  3. self.max_tasks = 10
  4. self.max_redirect = max_redirect
  5. self.q = Queue()
  6. self.seen_urls = set()
  7. # aiohttp's ClientSession does connection pooling and
  8. # HTTP keep-alives for us.
  9. self.session = aiohttp.ClientSession(loop=loop)
  10. # Put (URL, max_redirect) in the queue.
  11. self.q.put((root_url, self.max_redirect))

现在队列中未完成的任务数是 1。回到我们的主程序,启动事件循环和 crawl 方法:


 
 
  1. loop.run_until_complete(crawler.crawl())

crawl 协程把 worker 们赶起来干活。它像一个主线程:阻塞在 join 上直到所有任务完成,同时 worker 们在后台运行。


 
 
  1. @asyncio.coroutine
  2. def crawl(self):
  3. """Run the crawler until all work is done."""
  4. workers = [asyncio.Task(self.work())
  5. for _ in range(self.max_tasks)]
  6. # When all work is done, exit.
  7. yield from self.q.join()
  8. for w in workers:
  9. w.cancel()

如果 worker 是线程,可能我们不会一次把它们全部创建出来。为了避免创建线程的昂贵代价,通常一个线程池会按需增长。但是协程很廉价,我们可以直接把他们全部创建出来。

怎么关闭这个 crawler 很有趣。当 join 完成,worker 存活但是被暂停:他们等待更多的 URL,所以主协程要在退出之前清除它们。否则 Python 解释器关闭并调用所有对象的析构函数时,活着的 worker 会哭喊到:


 
 
  1. ERROR:asyncio:Task was destroyed but it is pending!

cancel 又是如何工作的呢?生成器还有一个我们还没介绍的特点。你可以从外部抛一个异常给它:


 
 
  1. >>> gen = gen_fn()
  2. >>> gen.send(None) # Start the generator as usual.
  3. 1
  4. >>> gen.throw(Exception('error'))
  5. Traceback (most recent call last):
  6. File "<input>", line 3, in <module>
  7. File "<input>", line 2, in gen_fn
  8. Exception: error

生成器被 throw 恢复,但是它现在抛出一个异常。如过生成器的调用堆栈中没有捕获异常的代码,这个异常被传递到顶层。所以注销一个协程:


 
 
  1. # Method of Task class.
  2. def cancel(self):
  3. self.coro.throw(CancelledError)

任何时候生成器暂停,在某些 yield from 语句它恢复并且抛出一个异常。我们在 task 的 step 方法中处理注销。


 
 
  1. # Method of Task class.
  2. def step(self, future):
  3. try:
  4. next_future = self.coro.send(future.result)
  5. except CancelledError:
  6. self.cancelled = True
  7. return
  8. except StopIteration:
  9. return
  10. next_future.add_done_callback(self.step)

现在 task 知道它被注销了,所以当它被销毁时,它不再抱怨。

一旦 crawl 注销了 worker,它就退出。同时事件循环看见这个协程结束了(我们后面会见到的),也就退出。


 
 
  1. loop.run_until_complete(crawler.crawl())

crawl 方法包含了所有主协程需要做的事。而 worker 则完成从队列中获取 URL、获取网页、解析它们得到新的链接。每个 worker 独立地运行 work 协程:


 
 
  1. @asyncio.coroutine
  2. def work(self):
  3. while True:
  4. url, max_redirect = yield from self.q.get()
  5. # Download page and add new links to self.q.
  6. yield from self.fetch(url, max_redirect)
  7. self.q.task_done()

Python 看见这段代码包含 yield from 语句,就把它编译成生成器函数。所以在 crawl 方法中,我们调用了 10 次 self.work,但并没有真正执行,它仅仅创建了 10 个指向这段代码的生成器对象并把它们包装成 Task 对象。task 接收每个生成器所 yield 的 future,通过调用 send 方法,当 future 解决时,用 future 的结果做为 send 的参数,来驱动它。由于生成器有自己的栈帧,它们可以独立运行,带有独立的局部变量和指令指针。

worker 使用队列来协调其小伙伴。它这样等待新的 URL:


 
 
  1. url, max_redirect = yield from self.q.get()

队列的 get 方法自身也是一个协程,它一直暂停到有新的 URL 进入队列,然后恢复并返回该条目。

碰巧,这也是当主协程注销 worker 时,最后 crawl 停止,worker 协程暂停的地方。从协程的角度,yield from 抛出CancelledError 结束了它在循环中的最后旅程。

worker 获取一个网页,解析链接,把新的链接放入队列中,接着调用task_done减小计数器。最终一个worker遇到一个没有新链接的网页,并且队列里也没有任务,这次task_done的调用使计数器减为0,而crawl正阻塞在join方法上,现在它就可以结束了。

我们承诺过要解释为什么队列中要使用序对,像这样:


 
 
  1. # URL to fetch, and the number of redirects left.
  2. ('http://xkcd.com/353', 10)

新的 URL 的重定向次数是10。获取一个特别的 URL 会重定向一个新的位置。我们减小重定向次数,并把新的 URL 放入队列中。


 
 
  1. # URL with a trailing slash. Nine redirects left.
  2. ('http://xkcd.com/353/', 9)

我们使用的 aiohttp 默认会跟踪重定向并返回最终结果。但是,我们告诉它不要这样做,爬虫自己来处理重定向,以便它可以合并那些目的相同的重定向路径:如果我们已经在 self.seen_urls 看到一个 URL,说明它已经从其他的地方走过这条路了。

Figure 5.4 - Redirects

Figure 5.4 - Redirects

crawler 获取“foo”并发现它重定向到了“baz”,所以它会加“baz”到队列和 seen_urls 中。如果它获取的下一个页面“bar” 也重定向到“baz”,fetcher 不会再次将 “baz”加入到队列中。如果该响应是一个页面,而不是一个重定向,fetch 会解析它的链接,并把新链接放到队列中。


 
 
  1. @asyncio.coroutine
  2. def fetch(self, url, max_redirect):
  3. # Handle redirects ourselves.
  4. response = yield from self.session.get(
  5. url, allow_redirects=False)
  6. try:
  7. if is_redirect(response):
  8. if max_redirect > 0:
  9. next_url = response.headers['location']
  10. if next_url in self.seen_urls:
  11. # We have been down this path before.
  12. return
  13. # Remember we have seen this URL.
  14. self.seen_urls.add(next_url)
  15. # Follow the redirect. One less redirect remains.
  16. self.q.put_nowait((next_url, max_redirect - 1))
  17. else:
  18. links = yield from self.parse_links(response)
  19. # Python set-logic:
  20. for link in links.difference(self.seen_urls):
  21. self.q.put_nowait((link, self.max_redirect))
  22. self.seen_urls.update(links)
  23. finally:
  24. # Return connection to pool.
  25. yield from response.release()

如果这是多进程代码,就有可能遇到讨厌的竞争条件。比如,一个 worker 检查一个链接是否在 seen_urls中,如果没有它就把这个链接加到队列中并把它放到 seen_urls 中。如果它在这两步操作之间被中断,而另一个 worker 解析到相同的链接,发现它并没有出现在 seen_urls 中就把它加入队列中。这(至少)导致同样的链接在队列中出现两次,做了重复的工作和错误的统计。

然而,一个协程只在 yield from 时才会被中断。这是协程比多线程少遇到竞争条件的关键。多线程必须获得锁来明确的进入一个临界区,否则它就是可中断的。而 Python 的协程默认是不会被中断的,只有它明确 yield 时才主动放弃控制权。

我们不再需要在用回调方式时用的 fetcher 类了。这个类只是不高效回调的一个变通方法:在等待 I/O 时,它需要一个存储状态的地方,因为局部变量并不能在函数调用间保留。倒是 fetch 协程可以像普通函数一样用局部变量保存它的状态,所以我们不再需要一个类。

当 fetch 完成对服务器响应的处理,它返回到它的调用者 workwork 方法对队列调用 task_done,接着从队列中取出一个要获取的 URL。

当 fetch 把新的链接放入队列中,它增加未完成的任务计数器,并停留在主协程,主协程在等待 q.join,处于暂停状态。而当没有新的链接并且这是队列中最后一个 URL 时,当 work 调用task_done,任务计数器变为 0,主协程从join` 中退出。

与 worker 和主协程一起工作的队列代码像这样(实际的 asyncio.Queue 实现在 Future 所展示的地方使用asyncio.Event 。不同之处在于 Event 是可以重置的,而 Future 不能从已解决返回变成待决。)


 
 
  1. class Queue:
  2. def __init__(self):
  3. self._join_future = Future()
  4. self._unfinished_tasks = 0
  5. # ... other initialization ...
  6. def put_nowait(self, item):
  7. self._unfinished_tasks += 1
  8. # ... store the item ...
  9. def task_done(self):
  10. self._unfinished_tasks -= 1
  11. if self._unfinished_tasks == 0:
  12. self._join_future.set_result(None)
  13. @asyncio.coroutine
  14. def join(self):
  15. if self._unfinished_tasks > 0:
  16. yield from self._join_future

主协程 crawl yield from join。所以当最后一个 worker 把计数器减为 0,它告诉 crawl 恢复运行并结束。

旅程快要结束了。我们的程序从 crawl 调用开始:


 
 
  1. loop.run_until_complete(self.crawler.crawl())

程序如何结束?因为 crawl 是一个生成器函数,调用它返回一个生成器。为了驱动它,asyncio 把它包装成一个 task:


 
 
  1. class EventLoop:
  2. def run_until_complete(self, coro):
  3. """Run until the coroutine is done."""
  4. task = Task(coro)
  5. task.add_done_callback(stop_callback)
  6. try:
  7. self.run_forever()
  8. except StopError:
  9. pass
  10. class StopError(BaseException):
  11. """Raised to stop the event loop."""
  12. def stop_callback(future):
  13. raise StopError

当这个任务完成,它抛出 StopError,事件循环把这个异常当作正常退出的信号。

但是,task 的 add_done_callbock 和 result 方法又是什么呢?你可能认为 task 就像一个 future,不错,你的直觉是对的。我们必须承认一个向你隐藏的细节,task 是 future。


 
 
  1. class Task(Future):
  2. """A coroutine wrapped in a Future."""

通常,一个 future 被别人调用 set_result 解决。但是 task,当协程结束时,它自己解决自己。记得我们解释过当 Python 生成器返回时,它抛出一个特殊的 StopIteration 异常:


 
 
  1. # Method of class Task.
  2. def step(self, future):
  3. try:
  4. next_future = self.coro.send(future.result)
  5. except CancelledError:
  6. self.cancelled = True
  7. return
  8. except StopIteration as exc:
  9. # Task resolves itself with coro's return
  10. # value.
  11. self.set_result(exc.value)
  12. return
  13. next_future.add_done_callback(self.step)

所以当事件循环调用 task.add_done_callback(stop_callback),它就准备被这个 task 停止。在看一次run_until_complete


 
 
  1. # Method of event loop.
  2. def run_until_complete(self, coro):
  3. task = Task(coro)
  4. task.add_done_callback(stop_callback)
  5. try:
  6. self.run_forever()
  7. except StopError:
  8. pass

当 task 捕获 StopIteration 并解决自己,这个回调从循环中抛出 StopError。循环结束,调用栈回到run_until_complete。我们的程序结束。

总结

现代的程序越来越多是 I/O 密集型而不是 CPU 密集型。对于这样的程序,Python 的线程在两个方面不合适:全局解释器锁阻止真正的并行计算,并且抢占切换也导致他们更容易出现竞争。异步通常是正确的选择。但是随着基于回调的异步代码增加,它会变得非常混乱。协程是一个更整洁的替代者。它们自然地重构成子过程,有健全的异常处理和栈追溯。

如果我们换个角度看 yield from 语句,一个协程看起来像一个传统的做阻塞 I/O 的线程。甚至我们可以采用经典的多线程模式编程,不需要重新发明。因此,与回调相比,协程更适合有经验的多线程的编码者。

但是当我们睁开眼睛关注 yield from 语句,我们能看到协程放弃控制权、允许其它人运行的标志点。不像多线程,协程展示出我们的代码哪里可以被中断哪里不能。在 Glyph Lefkowitz 富有启发性的文章“Unyielding”:“线程让局部推理变得困难,然而局部推理可能是软件开发中最重要的事”。然而,明确的 yield,让“通过过程本身而不是整个系统理解它的行为(和因此、正确性)”成为可能。

这章写于 Python 和异步的复兴时期。你刚学到的基于生成器的的协程,在 2014 年发布在 Python 3.4 的 asyncio 模块中。2015 年 9 月,Python 3.5 发布,协程成为语言的一部分。这个原生的协程通过“async def”来声明, 使用“await”而不是“yield from”委托一个协程或者等待 Future。

除了这些优点,核心的思想不变。Python 新的原生协程与生成器只是在语法上不同,工作原理非常相似。事实上,在 Python 解释器中它们共用同一个实现方法。Task、Future 和事件循环在 asynico 中扮演着同样的角色。

你已经知道 asyncio 协程是如何工作的了,现在你可以忘记大部分的细节。这些机制隐藏在一个整洁的接口下。但是你对这基本原理的理解能让你在现代异步环境下正确而高效的编写代码。

原文发布时间为:2017-03-06

本文来自云栖社区合作伙伴“Linux中国”

相关文章
|
1月前
|
搜索推荐 程序员 调度
精通Python异步编程:利用Asyncio与Aiohttp构建高效网络应用
【10月更文挑战第5天】随着互联网技术的快速发展,用户对于网络应用的响应速度和服务质量提出了越来越高的要求。为了构建能够处理高并发请求、提供快速响应时间的应用程序,开发者们需要掌握高效的编程技术和框架。在Python语言中,`asyncio` 和 `aiohttp` 是两个非常强大的库,它们可以帮助我们编写出既简洁又高效的异步网络应用。
131 1
|
16天前
|
Python
Python中的异步编程:使用asyncio和aiohttp实现高效网络请求
【10月更文挑战第34天】在Python的世界里,异步编程是提高效率的利器。本文将带你了解如何使用asyncio和aiohttp库来编写高效的网络请求代码。我们将通过一个简单的示例来展示如何利用这些工具来并发地处理多个网络请求,从而提高程序的整体性能。准备好让你的Python代码飞起来吧!
39 2
|
1月前
|
调度 Python
python知识点100篇系列(20)-python协程与异步编程asyncio
【10月更文挑战第8天】协程(Coroutine)是一种用户态内的上下文切换技术,通过单线程实现代码块间的切换执行。Python中实现协程的方法包括yield、asyncio模块及async/await关键字。其中,async/await结合asyncio模块可更便捷地编写和管理协程,支持异步IO操作,提高程序并发性能。协程函数、协程对象、Task对象等是其核心概念。
|
2月前
|
调度 开发者 Python
探索Python中的异步编程:理解asyncio和协程
【9月更文挑战第22天】在现代软件工程中,异步编程是提升应用性能的关键技术之一。本文将深入探讨Python语言中的异步编程模型,特别是asyncio库的使用和协程的概念。我们将了解如何通过事件循环和任务来处理并发操作,以及如何用协程来编写非阻塞的代码。文章不仅会介绍理论知识,还会通过实际的代码示例展示如何在Python中实现高效的异步操作。
|
2月前
|
数据采集
爬虫之协程异步 asyncio和aiohttp
爬虫之协程异步 asyncio和aiohttp
|
2月前
|
开发者 Python
探索Python中的异步编程:理解Asyncio和协程
【9月更文挑战第18天】在Python的世界中,异步编程是一个强大而神秘的概念。它像是一把双刃剑,掌握得好可以大幅提升程序的效率和性能;使用不当则可能让代码变得难以维护和理解。本文将带你一探究竟,通过深入浅出的方式介绍Python中asyncio库和协程的基本概念、使用方法及其背后的原理,让你对异步编程有一个全新的认识。
|
3月前
|
大数据 API 调度
Python中的异步编程:理解asyncio模块与协程
在现代编程中,异步编程越来越重要,特别是在处理大规模并发任务时。Python的asyncio模块提供了强大的工具来实现异步操作,其中协程是其核心机制之一。本文将深入探讨asyncio模块的基本概念、如何编写和管理异步任务,以及协程的工作原理和实际应用。
|
4月前
|
存储 调度 Python
异步编程概述在 Python中,`asyncio`库提供了对异步I/O、事件循环、协程(coroutine)和任务的支持。
异步编程概述在 Python中,`asyncio`库提供了对异步I/O、事件循环、协程(coroutine)和任务的支持。
|
4天前
|
安全 网络安全 数据安全/隐私保护
网络安全与信息安全:关于网络安全漏洞、加密技术、安全意识等方面的知识分享
在数字化时代,网络安全和信息安全已成为我们生活中不可或缺的一部分。本文将介绍网络安全漏洞、加密技术和安全意识等方面的知识,并提供一些实用的技巧和建议,帮助读者更好地保护自己的网络安全和信息安全。
|
3天前
|
安全 算法 网络安全
网络安全与信息安全:关于网络安全漏洞、加密技术、安全意识等方面的知识分享
在当今数字化时代,网络安全和信息安全已经成为了全球关注的焦点。随着技术的发展,网络攻击手段日益狡猾,而防范措施也必须不断更新以应对新的挑战。本文将深入探讨网络安全的常见漏洞,介绍加密技术的基本概念和应用,并强调培养良好安全意识的重要性。通过这些知识的分享,旨在提升公众对网络安全的认识,共同构建更加安全的网络环境。
下一篇
无影云桌面