翻译自开源书籍500 Lines or Less中A Web Crawler With asyncio Coroutines一节。
A Web Crawler With asyncio Coroutines - A. Jesse Jiryu Davis and Guido van Rossum
第一部分见
翻译 | 500 Lines or Less 基于Python协程的网页爬虫 (一)
用生成器构建协程
Building Coroutines With Generators
所以一个生成器可以暂停,可以用一个值恢复,并且它有一个返回值。听起来是一个很好的特性,可以在不使用面条式的回调函数的情况下编写一个异步I/O模块。我们的目的是构建一个“协程”:一个可以在程序中与其它例程合作调度的例程。我们的协程是Python标准库”asynico”中协程的一个简化版本。就像在asyncio库中一样,我们会使用生成器,futures类,和”yield from”语句。
首先,我们需要一个方法来表示一个协程正在等待的future类事件,一个简化版本为:
1 | class Future: |
一个future被初始化为“未解决的”,他通过调用set_result
来解决。
让我们用futures类和协程来改写我们的抓取器。先使用回调函数编写一个fetch:
1 | class Fetcher: |
fetch
方法从连接一个socket开始,然后注册一个回调函数connected
,当socket准备好的时候执行connected
。现在,我们可以把两个步骤整合在一个协程里面:
1 | def fetch(self): |
现在fetch
已经是一个生成器函数,相比于普通的函数,它包含了一个yield
语句。我们创建一个未定的future,然后yield它,它会暂停直到socket连接建立。内建函数on_connected
解决了这个future。
但是当future解决的时候,谁来回复这个生成器?我们需要一个协程驱动。把它称为”task”:
1 | class Task: |
task向fetch
生成器发送None
来启动它。fetch
运行到它yield出一个future,task捕获它命名为next_future
。当socket建立的时候,事件循环启动on_connected
回调函数,它会解决future,调用step
,恢复fetch
。
用yield from
重构协程
Factoring Coroutines With yield from
一旦socket的连接建立,我们向服务器发送一个HTTP GET请求,然后读取服务器的响应。这些步骤不需要分散在回调函数中;我们把它整合在同一个生成器函数里面:
1 | def fetch(self): |
这种从socket读取全部消息的代码看起来是可以起作用的。那么我们如何把它分解为一个字程序呢?现在Python著名的yield from
语句登场了。它将一个生成器委派给另一个生成器,
为了看清楚原理,我们回到我们那个简单的生成器例子:
1 | def gen_fn(): |
为了在另一个生成器中调用这个生成器,用yield from
委派它:
1 | # Generator function: |
caller
生成器的行为就好像是gen
一样,这个生成器被委派给:
1 | None) caller.send( |
当caller
从gen
被委派(yield from),caller
就不再前进。注意到指令指针仍然停留在15,也就是yield from
语句的位置,即使内部的生成器从一个yield语句运行到下一个,指针的位置仍然不变。从我们的角度看caller
,不能区分它yield出的值是来自caller
本身还是它委派的生成器。从gen
往外看,我们不能区分它接收到的值是来自caller
还是外面。yield from
语句就像是一个光滑的管道,值通过这个管道从gen
流入流出,直到gen
结束。
一个协程可以使用yield from
语句将任务委派给子协程,然后从中接收任务的结果。注意到上面的执行过程,caller
打印了”return value of yield-from: done”。当gen
执行完毕,它的返回值会变成caller
中的yield from
语句的返回值。
1 | rv = yield from gen |
在上面,我们批判了基于回掉函数的异步编程时,最大的不满是关于“堆栈撕裂”:当一个回调抛出一个异常,它的堆栈回溯通常是没有用的。它仅仅显示了事件循环正在运行一个回调,而没有说明为什么。在这个问题上协程表现如何呢?
1 | def gen_fn(): |
这样有用多了!堆栈回溯显示了发生错误的时候caller_fn
正在委派给gen_fn
。令人欣慰的是,你可以在一直异常处理器中封装这个调用给子协程,就像我们在正常的子程序中做的那样。
1 | def gen_fn(): |
所以我们可以像常规子过程一样提取子协程。让我们从fetcher中提取一些有用的子协程。我们先写一个读取一块数据的子协程:
1 | def read(sock): |
我们在read
的基础上,构建read_all
读取整个消息:
1 | def read_all(sock): |
如果你从另一个角度看,yield from
语句消失了而且它就像一个普通阻塞I/O函数一样。但实际上,read
和read_all
函数都是协程。read
通过yield from
暂停read_all
直到I/O操作完成。当read_all
暂停的时候,异步I/O的事件循环会进行其它的任务并且等待其它I/O事件;read
在下次循环中当事件就绪,完成I/O操作时,read_all
恢复运行。
最后一步,fetch
调用了read_all
。
1 | class Fetcher: |
神奇的是,Task类不需要被修改,它可以像以前一样驱动fetch
协程。
1 | Task(fetcher.fetch()) |
当read
yield出一个future,task从yield from
管道接收它,就像这个future直接从fetch
yield出来一样。当循环解决一个future,task向fetch
发送它的结果,值被read
接收,这个过程就像task直接驱动read
一样。
为了完善我们的协程实现,我们再做一些打磨:我们的代码在等待一个future时使用yield
,在委托一个子协程的时候使用yield from
。如果我们不管是不是协程都使用yield from
,代码将会更精炼。这样,协程不需要关心它等待的东西的类型。
我们在Python生成器和迭代器的高度相似性中得到好处。对于调用者来说,推进一个生成器就像推进一个迭代器一样。所以我们用一个特殊的方法实现来让Future类变成可迭代对象。
1 | # Method on Future class. |
future中的__iter__
方法是一个yield它自身的协程。现在我们将我们的代码替换:
1 | # f is a Future. |
替换成:
1 | # f is a Future. |
结果是一样的!驱动Task从它调用的send
中接收一个future,当future被解决时它向协程发送一个新的结果。
在所有地方都使用yield from
的好处是什么?为什么比用yield
等待future然后yield from
委派子协程更好?原因是,一个方法可以快速地改变它的实现而不影响其调用者:它是一个普通的方法可以在future解决时返回一个值,或者是一个包含yield from
语句并且会返回一个值的协程。另一方面,调用者只需要yield from
这个方法然后等待结果就行了。
亲爱的读者,我们愉快地已经完成了对asynico中协程的愉快的阐述。我们专注于生成器的机制,并且简述了一个future类和task类的实现。我们指出了asynico如何利用了两个方面的优点:比线程更快和比回调函数更具有可读性的并发I/O。当然,真正的asynico会比我们的简化版本有着更精密。真正的框架需要处理零拷贝I/O,公平调度,异常处理,以及许多其它特性。
对于一个asynico用户来说,使用协程编程逼你在这里看到的版本要简便的多。在上面的代码中,我们从基本的原理去实现协程,所以你看到了回调函数,task类,future类。你也可以看到非阻塞sockets以及select
的调用。但是当你使用asynico去构建一个应用程序时,这些都不会在你的代码中出现。就像我们承诺的那样,你可以轻松地抓取一个URL:
1 |
|
对我们的探索还满意吗,回到我们最原始的任务:使用asynico编写一个异步I/O的网页爬虫。
使用协程工作
Coordinating Coroutines
我们从描述我们想让爬虫进行什么工作开始。现在是时候开始用asynico协程实现它了。
我们的爬虫会抓取第一个页面,解析上面的链接,然后把它们添加到队列中。然后它开始遨游整个网站,并发的抓取网页。但是因为客户端以及服务器的负载限制,我们需要设置最大的worker数量,而不是越多越好。当一个worker完成一个页面的抓取,必须从队列中拉起另外一个链接。在某些时期,我们将会没有足够的工作去完成,此时一些worker必须可以暂停。同时,一但有一个worker抓取到一个页面上的大量新链接,队列里面的链接会突然增加,暂停的worker应该立刻被唤醒开始工作。最后,当任务完成,我们的程序会马上退出。
想象一下如果那些worker是线程。我们要如何表达爬虫的算法呢?我们可以使用Pyhton标准库中的同步队列。每当一个新的项目被加入队列,队列增加它的”tasks”计数。worker线程完成一个项目的工作后,调用task_done
。主线程会在Quene.join
阻塞,直到放入队列的任务数量与task_done
的调用次数相等,然后退出。
协程使用一个asynico队列,使用跟线程几乎一样的模式来实现:
1 | try: |
我们把worker的状态收集在一个crawler类中,然后在它的crawl
方法中编写主逻辑。我们在一个协程中启动crawl
并且运行asynico事件循环直到crawl
完成
1 | loop = asyncio.get_event_loop() |
爬取器从一个根URL和max_redirect
开始,max_redirect
是最大重定向数。它把参数对(URL, max_redirect)
放在队列中。(为什么要这样做呢,我们稍后再说)
1 | class Crawler: |
现在队列中未完成的任务数是1。回到我们的主程序,启动时间循环和crawl
方法:
1 | loop.run_until_complete(crawler.crawl()) |
crawl
协程启动worker。它就像一个主线程一样:它在join
阻塞直到所有的任务完成,此时,worker在后台工作。
1 |
|
如果worker是线程,我们不会希望一次性全部创建。为了在确切需要线程之前避免线程的昂贵开销,通常需要按需增长的线程池。但是协程的开销很小,所以我们可以直接把它们全部创建出来。
值得注意的是,我们如何关闭爬虫呢?当join
future解决时,worker还是存活的,只是处于暂停状态:它们正在等待更多的URL但是已经没有了。所以,主协程在退出之前注销它们。否则,当Python解释器关闭以及调用所有对象的析构函数时,存活的worker会哭喊道:
1 | ERROR:asyncio:Task was destroyed but it is pending! |
那cancel
又是如何工作的呢?生成器还有一个没有向你展示的特性。你可以从外面抛一个异常给它。
1 | gen = gen_fn() |
生成器被throw
重新启动,但是它现在抛出一个异常。如果生成器的调用堆栈里面没有可以捕获这个异常的代码,这个异常将会被传递到顶层。所以注销一个协程。
1 | # Method of Task class. |
当生成器暂停时,它在某一个yield from
语句处恢复然后抛出一个异常。我们在task的step
方法中处理注销:
1 | # Method of Task class. |
现在task知道协程被注销了,所以当它被销毁时,它将不会再抱怨。
一旦crawl
注销了worker,它就会退出。事件循环看到这个协程结束了(我们会在晚一些看到),同样也会结束:
1 | loop.run_until_complete(crawler.crawl()) |
crawl
方法包含了所以我们主协程需要做的事情。而worker协程则是负责从队列中取出URL,抓取它们的页面,然后解析它们获得新的链接。每一个worker独立地运行一个work
协程:
1 |
|
Python看到代码中包含yield from
语句,然后把代码解释成生成器函数。所以在crawl
中,当主协程调用了10次self.work
时,它并没有实际运行这个方法:只是创建了10个指向这些代码的生成器对象。它们被包装成Task对象。Task接收每一个从生成器yield出的future,然后通过调用send
方法,用future解决时的结果作为参数,来驱动生成器。因为生成器有自己的栈帧,它们会独立地运行,有着独立的局部变量和指令指针。
worker通过队列与同伴协调。它这样等待新的URL:
1 | url, max_redirect = yield from self.q.get() |
队列的get
方法本来就是一个协程:它等待有人向队列加入一个新的条目,然后恢复返回那个条目。
碰巧,这也是当主协程注销worker时,最后crawl停止,worker协程暂停的地方。从协程的角度,yield from
抛出CancelledError
结束了它在循环中的最后旅程。
当worker爬取了一个页面时,他解析上面的链接,把新的链接放到队列中,然后调用task_done
来减小计数器。最终,一个worker抓取了一个没有新链接的页面,而且队列中也没有新的任务。这时worker对task_done
的调用使计数器减为零。而crawl
正阻塞在队列的join
方法上,现在它可以结束了。
我们说过要解释为什么队列中的元素是成对的:
1 | # URL to fetch, and the number of redirects left. |
新的URL的重定向次数是10。爬取这些特殊的URL会重定向到一个带末位斜杠的新位置。我们减少重定向的次数,然后把下一个新的位置放进队列:
1 | # URL with a trailing slash. Nine redirects left. |
我们使用的aiohttp
库会默认跟踪重定向并且向我们返回最终的结果。但是,我们现在要告诉它不要这样做,我们在爬虫中处理重定向,以便它可以合并那些相同的重定向路径:如果我们已经在self.seen_urls
中看到这个URL,说明它已经从其它的入口走过这条路了:
爬虫抓取了”foo”并发现它重定向到”baz”,所以它把”baz”加入到队列中和seen_urls
中。如果它抓取的下一个页面是”bar”,这个页面也重定向到”baz”,爬虫就不会在把它加入到队列中。如果它的响应是一个页面而不是重定向,fetch
会解析这个页面并且把新的连接加入队列中。
1 |
|
如果这是一些多线程代码,它会遇到讨厌的竞态条件。比如说,worker检查一个URL是否在seen_urls
中,如果没有,worker就会把它加入到队列中并且加入到seen_urls
中。如果它在这两个步骤之间中断,然后另一个worker从另一个页面解析到同样的链接,它并没有在seen_urls
,所以把它加入到队列中。现在同样的链接在队列中至少出现了两次,导致了重复的工作和错误的统计。
然而,一个协程只有在yield from
语句时才会被中断。这是协程比多线程程序少遇到竞争的关键:多线程必须明确的有锁地进入临界区,否则它可能是中断的。一个Python协程默认是不可以中断的,并且只会在它明确yield时才会交出控制权。
我们不再需要一个像我们的回调函数编程中的fetcher类了。这个类是一个有缺陷的回调的变通方法:它们需要一些位置来存储等待I/O时的状态,因为它们的局部变量不能在调用之间保留。但是fetch
协程可以像普通函数在一样局部变量中存储它的状态,所以它不再需要一个类。
当fetch
完成处理服务器响应的工作时,它返回到它的调用者worke
。work
对队列调用task_done
然后从队列取出要爬取的下一个URL。
当fetch
把新的链接放入队列中时,它增加未完成任务的计数器并且停留在主协程中,主协程在等待q.join
,处于暂停状态。而当页面中没有新的链接并且这是队列中最后一个URL时,work
调用task_done
,任务计数器减少到0。然后主协程从join
中退出。
协调worker和主协程的队列代码像这样:
1 | class Queue: |
主协程crawl
yield fromjoin
,所以当最后的worker把计数器减为0,它告诉crawl
恢复运行,然后结束。
旅程快要结束了。我们的程序从调用crawl
开始:
1 | loop.run_until_complete(self.crawler.crawl()) |
程序如何结束?因为crawl
是一个生成器函数,调用它会返回一个生成器。为了驱动它,asynico把它包装在一个task里:
1 | class EventLoop: |
当task完成的时候,它抛出StopError
异常,事件循环把这个异常作为正常退出的信号。
但task的add_done_callback
和result
方法又是什么呢?你可能想到一个task就像一个future。你的直觉是对的。我们必须承认一个我们像你隐藏的关于Task类的细节:一个task是一个future。
1 | class Task(Future): |
通常情况下,future通过别人调用它的set_result
来解决。但是一个task会在写协程停止的时候解决它自身。还记得我们之前关于Python协程探索,当一个生成器返回时,它会抛出一个特殊的异常StopIteration
:
1 | # Method of class Task. |
所以当事件循环调用task.add_done_callback(stop_callback)
,他就准备被这个task停止。再看一次run_until_complete
:
1 | # Method of event loop. |
当task捕获StopIteration
然后解决它自身,回调从循环中抛出StopError
。事件循环结束,调用堆栈回到run_until_complete
。我们的程序完成了。
结论
Conclusion
现代程序更多的是I/O密集型而不是CPU密集型。对于这些程序,Python多线程有两个缺陷:GIL全局锁使得它们实际上并没有进行并行计算,抢占式切换是的他们更容易出现竞争。异步I/O通常上来说是正确的模式。但是随着基于回调函数的代码的增加,会导致出现混乱难以读懂的程序。协程是一个简洁的替代者。它们可以自然地被分解成子过程,有着完整的异常处理和堆栈跟踪。
如果我们从另一个角度看yield from
语句,一个协程更像一个传统的做阻塞I/O的线程。我们可以使用与传统多线程编程相似的模式进行协程的编程,它不需要被重塑。因此,相比于回调函数,协程对于有着丰富检验的多线程程序员更为适合。
但是当我们关注yield from
语句的时候,我们可以看到协程放弃控制权,允许其它人运行的特点。不像线程那样,协程可以显示代码中可以中断的地方和不可以中断的地方。Glyph Lefkowitz富有启发性的文章写道:“线程让局部推理变得困难,而局部推理也许是软件开发中最重要的事情。”然而明确的yield,让“通过过程本身而不是整个系统去理解它的行为(和因此、正确性)”成为可能。
这一篇文章写于Python和异步的复兴时期。你刚刚学到的基于生成器的协程,包含在2014年3月发布的Python 3.4 的”asynico”库中。在2015年9月,Python 3.5发布,协程成为语言的一部分。这个原生的协程通过新的语法”async def”来声明,使用新的”await”关键字代替”yield from”委派协程或者等待一个Future。
尽管有了这些改进,核心思想还是不变的。Python的新原生协程与生成器语法不同,但是它们的工作原理非常相似;实际上,它们在Python解释器中共用一种实现方法。Task,Future,以及事件循环在asynico中还是扮演着同样的角色。
现在你知道了asynico协程是如何工作的,你可以忘记大部分的细节。它们的机制隐藏在一个整洁的接口下。但是理解这些基本原理可以让在现代异步环境中你高效而正确地编写代码。