翻译 | 500 Lines or Less 基于Python协程的网页爬虫 (一)

翻译自开源书籍500 Lines or Less中A Web Crawler With asyncio Coroutines一节。手翻略水,以原文为准。

A Web Crawler With asyncio Coroutines - A. Jesse Jiryu Davis and Guido van Rossum

这是文章的前半部分,后半部分见
翻译 | 500 Lines or Less 基于Python协程的网页爬虫 (二)

A. Jesse Jiryu Davis是纽约MongoDB的一名工程师。他编写了Motor,异步MongoDB Python驱动,并且他是MongoDB C 驱动的首席开发者和PyMongo团队的成员。同时,他也对asyncio和Tornada有贡献。他的主页是http://emptysqua.re.

Guido van Rossum是一种主要用于web及线下的编程语言——Python的创造者。Python社区称之为“仁慈的独裁者”,它是来自短剧Monty Python的一个标题。Guido的主页是http://www.python.org/~guido/

简介

Introduction

传统的计算机科学注重于可以以最快速度完成计算任务的更高效的算法。但是很多网络应用程序不把时间花在cpu计算上,而花在保持某些缓慢的、出现突发事件的网络连接上。这些应用程序带来了一个艰难的挑战:更有效率地等待大量的网络连接。一个解决这个问题的现代方案是使用asynchronous 异步I/O,也叫async。

本文提出一个简单的web爬虫。这个爬虫是一个典型的async应用程序,它在等待大量网络响应的同时只进行很少计算。爬虫能够获取的页面越多,它就越快结束。如果爬虫给每一个正在进行的请求分配一个线程,那么它会在系统sockets资源耗尽前先消耗完内存或其它线程相关的资源。使用异步I/O,就可以避免对大量线程的依赖。

我们通过三个阶段来实现这个例子。一,给出一个async事件循环并且编写一个爬虫通过回掉函数去调用这个循环,回调函数是一个高效的方式,但是把它扩展并应用在更复杂场景的时候会产生难以维护的“面条式代码”。二,我们展示了Python协程的高效性和可扩展性,我们使用Python的生成器函数来实现简单协程。最后一个阶段,我们使用Python标准库”asyncio”里面来创建具有完整功能的协程,并且用一个async队列来协调它。

爬虫的目标

The Task

一个web爬虫会找到并且下载一个网站上的所有页面,也许是为了索引或者归档它们。爬虫从一个根URL开始,抓取每一个页面并解析,抓取页面上没有访问过的链接,然后把链接加入一个队列。它会在找到的页面上没有未访问的链接,并在队列为空时结束。

我们可以通过同时下载多个页面来加速这个进程。当爬虫找到新的链接时,它会在不同的socket上同时为新的页面启动抓取操作。它在http响应到达时解析它们,并且把新的链接加入到队列中。过多并发请求带来的性能下降可能会导致最终的效能降低,所以我们限制了并发请求的数量,把剩余的链接放在队列里面知道某些正在进行的请求完成。

传统实现

The Traditional Approach

如何让我们的爬虫并发处理请求呢?传统上我们会创建一个线程池。每一个线程负责在一socket上下载一个页面。例如,从xkcd.com下载一个页面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def fetch(url):
sock = socket.socket()
sock.connect(('xkcd.com', 80))
request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)
sock.send(request.encode('ascii'))
response = b''
chunk = sock.recv(4096)
while chunk:
response += chunk
chunk = sock.recv(4096)

# Page is now downloaded.
links = parse_links(response)
q.add(links)

在默认状态下,socket操作是阻塞的:当线程调用一个方法例如connectrecv时,线程会暂停直到上面的操作完成[2]。因此,为了下载大量的页面,我们需要许多的线程。一个精巧的应用程序通过保持线程池中的空闲进程来控制创建进程所需要的系统花销,然后将它们检出以重复用于后续的任务,它就像管理sockets的连接池一样工作。

然而,线程会带来较大的系统开销,而且操作系统会对进程,用户或处理器可以拥有的线程数量有着各种各样严格的限制。在jesse的系统上,一个Python线程会占用大约50k的内存,而且启动数万个进程时会失败。如果我们在并发套接字上扩展到数万个并发操作,会在socket资源耗尽前耗尽线程资源。对线程来说,线程本身的开销和系统限制都是瓶颈。

Dan Kegel在”The C10K problem”[3]中指出了多线程并发I/O的局限性。他说:

是时候让web服务器同时处理上万个客户端了,不是吗?毕竟网络已经这么大了。

Kegel撰写”C10K”的时候是1999年。一万个连接在今天听起来不多,问题的规模已经发生了变化,本质还是不变的。在之前,给每一个连接分配一个线程是不现实的。现在的容量相比过去已经提高了好几个数量级。的确,我们的小爬虫可以在多线程的情况下很好地工作。但对于有着成千上万个连接的大规模应用程序来说,这个限制仍然是:大多数系统仍然可以创建socket,但是线程已经耗光了。如何克服这个问题呢?

异步I/O

Async

异步I/O使用非阻塞socket在单个线程上实现并发操作。回到我们的异步爬虫,在开始连接到服务器之前,我们先把socket设置为非阻塞:

1
2
3
4
5
6
sock = socket.socket()
sock.setblocking(False)
try:
sock.connect(('xkcd.com', 80))
except BlockingIOError:
pass

令人恼火的是,非阻塞socket会在connect的过程中抛出一个异常,即使它的工作是正常的。这个异常复制了底层C函数的恼人行为,它会将errno设置为EINPROGRESS来告诉你它已经开始了。

现在,我们的爬虫需要一个方法来知道连接何时成功建立,然后它就可以发送HTTP请求。我们可以通过一个循环简单实现:

1
2
3
4
5
6
7
8
9
10
11
request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)
encoded = request.encode('ascii')

while True:
try:
sock.send(encoded)
break # Done.
except OSError as e:
pass

print('sent')

但是这个方法不仅浪费资源,而且不能有效地在使用多个socket的时候等待事件。在以前,BSD Unix对于这个问题的解决方案是使用select,一个等待事件发生在非阻塞socket或小数组上的C函数。如今,拥有大量连接的网络应用程序催生了类似于poll(轮询)的解决方案,例如BSD上的kqueue,和Linux上的epoll。这些API跟select很相似,而且在大量连接的使用场景下有着出色的表现。

Python 3.4的DefaultSelector会调用系统上与select最相似的函数。为了注册关于网络I/O的通知,我们创建一个非阻塞socket并用默认选择器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from selectors import DefaultSelector, EVENT_WRITE

selector = DefaultSelector()

sock = socket.socket()
sock.setblocking(False)
try:
sock.connect(('xkcd.com', 80))
except BlockingIOError:
pass

def connected():
selector.unregister(sock.fileno())
print('connected!')

selector.register(sock.fileno(), EVENT_WRITE, connected)

我们忽略掉那个虚假的错误,调用selector.register,传入socket的文件描述符和一个表示我们正在等待的事件的常量。为了在连接成功建立时是收到通知,我们传入EVENT_WRITE:即我们想知道socker什么时候处于可以写入的状态。同时,我们也传入了一个Python函数connected,它会在事件发生的时候运行。这个函数就是所谓的回调。

通过一个循环,我们在选择器收到I/O通知的时候处理它们。

1
2
3
4
5
6
def loop():
while True:
events = selector.select()
for event_key, event_mask in events:
callback = event_key.data
callback()

connected的存储形式是event_key.data,我们会在非阻塞socket连接成功时第一时间执行它。

与我们上面的简单(暴力?)循环不同,这里调用select的时候会暂停,等待下一个I/O事件。然后循环运行回调来等待这些事件。在收到事件循环的某个时间点之前,操作会保持挂起。

我们已经说明了什么?我们展示了当(连接)操作就绪时如何开始一个操作和执行一个回调。一个异步框架基于我们刚才所说的两种特性构建:非阻塞socket和事件循环——在一个线程上实现并发操作。

我们在这里已经实现了“并发”,但不是传统上说的“并发性”。也就是说,我们构建了一个可以进行重叠I/O的小系统。它有能力在其它操作正在进行的时候开始新的操作。它实际上并不是使用多个cpu核心来进行并行计算。但是,这种系统适用于I/O密集型问题,而不是CPU密集型问题。

所以我们的事件循环对于并发I/O是非常有效的,因为,它并不给每一个连接分配线程资源。但是在我们继续之前,还需要纠正一个重要的问题:异步I/O比多线程要快?在大多数情况下不是的,的确,在Python中,像我们这种事件循环链接在服务于少量活跃连接时会比多线程操作要慢一点。在没有全局锁的工况下,多线程会在这种工作负荷中有更好的表现。异步I/O的优势在于,应用在许多的缓慢连接和低频事件中。

用回调函数编程

Programming With Callbacks

到目前为止,我们已经建立起了简单的异步框架,那么如何编写一个web爬虫呢?即使编写一个简单URL爬取器也是痛苦的。

我们从存放已经爬取过的URL和看到的URL的两个set开始。(set:没有value的dict)

1
2
urls_todo = set(['/'])
seen_urls = set(['/'])

setseen_urls包含urls_todo的所有URL以及已经完成的URL。这两个set初始化时都包含根URL”/“。

爬取一个页面需要一系列的回调函数。当socket连接时会触发connected回调函数,然后给服务器发送一个GET请求。之后,它必须等待服务器的应答,所以它拉起另外一个回调函数。如果,在回调函数触发时,它不能读取全部的服务器响应,它会再次注册,以此类推。

我们把这些回调放在在Fetcher对象里。需要包含一个URL,一个sokcet对象,以及一个存放字节型响应的变量。

1
2
3
4
5
class Fetcher:
def __init__(self, url):
self.response = b'' # Empty array of bytes.
self.url = url
self.sock = None

Fetcher.fetch方法开始:

1
2
3
4
5
6
7
8
9
10
11
12
13
# Method on Fetcher class.
def fetch(self):
self.sock = socket.socket()
self.sock.setblocking(False)
try:
self.sock.connect(('xkcd.com', 80))
except BlockingIOError:
pass

# Register next callback.
selector.register(self.sock.fileno(),
EVENT_WRITE,
self.connected)

fetch方法从连接一个socket开始。但是要注意这个方法会在连接建立之前返回。它必须返回反正时间循环的控制以等待连接。为了理解原因,想象我们整个应用程序的结构是这样的:

1
2
3
4
5
6
7
8
9
# Begin fetching http://xkcd.com/353/
fetcher = Fetcher('/353/')
fetcher.fetch()

while True:
events = selector.select()
for event_key, event_mask in events:
callback = event_key.data
callback(event_key, event_mask)

select被调用时,事件循环会处理所有的事件通知。因此fetch必须手动控制事件循环,以便程序知道socket何时建立连接。只有这样,循环才会运行connected回调函数,这个回调函数是在fetch的结尾被注册的。

下面是connected的实现。

1
2
3
4
5
6
7
8
9
10
11
# Method on Fetcher class.
def connected(self, key, mask):
print('connected!')
selector.unregister(key.fd)
request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(self.url)
self.sock.send(request.encode('ascii'))

# Register the next callback.
selector.register(key.fd,
EVENT_READ,
self.read_response)

这个方法发送了一个GET请求。一个实际的应用程序会检测send的返回值,以防整个请求没有第一时间发送。但是我们的请求比较小,程序也不发杂。可以没有顾忌地调用send,然后等待响应。当然,它必须注册了下一个回调函数,并且将控制权交给了事件循环。下一个也是最后一个回调函数read_response,用于处理服务器的响应。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# Method on Fetcher class.
def read_response(self, key, mask):
global stopped

chunk = self.sock.recv(4096) # 4k chunk size.
if chunk:
self.response += chunk
else:
selector.unregister(key.fd) # Done reading.
links = self.parse_links()

# Python set-logic:
for link in links.difference(seen_urls):
urls_todo.add(link)
Fetcher(link).fetch() # <- New Fetcher.

seen_urls.update(links)
urls_todo.remove(self.url)
if not urls_todo:
stopped = True

当selector发现socket是可读的时候,它便会执行这一个回掉函数。socket可读意味着,socket里面有数据并且已经关闭。

回调函数会从socket读取最多4kb的数据。如果数据少于4kb,chunk会存取所有可用的数据。如果大于4kb,chunk会读取4kb的数据,但这个时候socket仍然是可读状态,所以事件循环会在下一个周期再次运行这个回调函数。当响应处理完成,服务器会关闭连接并且socket和chunk都是空的。

parse_links方法没有给出,它返回一个存放URL的set。我们在每一个新的URL开始新的爬取器,并没有并发限制。注意异步编程中使用回掉函数的一个优点:我们不需要互斥地对共享数据进行存取,例如我们向seec_urls添加一个链接。这是非抢占式的多任务,所以我们不能在程序的任意点中断。

我们添加一个全局变量stopped用于循环的控制:

1
2
3
4
5
6
7
8
stopped = False

def loop():
while not stopped:
events = selector.select()
for event_key, event_mask in events:
callback = event_key.data
callback()

当所有页面下载下载完成时,抓取器会停止,全局时间循环和程序都会退出。

这个例子清楚的说明了异步编程的问题:产生“面条式代码”。我们需要一种方式去表达一系列的计算和I/O操作,以及并发执行多个系列操作。但是在不使用多线程的情况下,一系列的操作不能用单个函数表达:当函数开始一个I/O操作时,它显式地保存它就将来所需的任何状态,然后返回。你要负责的是构思和编写这种状态保存代码。

让我们来解释一下这是什么意思。考虑如何简单地在一个线程上使用常规的阻塞socket来抓取一个URL:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Blocking version.
def fetch(url):
sock = socket.socket()
sock.connect(('xkcd.com', 80))
request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)
sock.send(request.encode('ascii'))
response = b''
chunk = sock.recv(4096)
while chunk:
response += chunk
chunk = sock.recv(4096)

# Page is now downloaded.
links = parse_links(response)
q.add(links)

在一个socket操作和下一个之间,这个函数会保存什么状态呢?它有socket,URL以及累积接收的响应。在线程上运行的函数使用编程语言的基本特性,把临时状态存储在其栈中的局部变量中。这种函数还有“附加部分”,也就是,在I/O完成的时候将要执行的代码。运行时,程序通过存储线程的指令指针来识别函数的“附加部分”。你不需要在I/O之后考虑恢复这些局部变量以及函数的附加部分。这是建立在语言的基础上的。

但是在基于回掉函数的异步框架中,这些语言特性没有任何的帮助。在等待I/O的过程中,函数必须显式地保存它的状态,因为函数会在I/O操作完成之前返回并且丢失栈区的内容。在替代回调函数的情况下,我们基于回调函数的实例把sockresponse存储为self的属性,属于Fetcher实例。为了替代指令指针,它通过注册connectedread_response回调函数来保存它的附加部分。随着应用程序复杂性的增加,我们需要在回调中手动保存的回调函数的状态复杂性也在增加。这种复杂的记法然程序员头痛。

更糟糕的是,如果回调函数在调用下一个回调函数之前抛出一个异常,会发生什么?如果我们的parse_links方法写的不够完善,它会在解析HTML的时候抛出这些异常:

1
2
3
4
5
6
7
8
9
10
Traceback (most recent call last):
File "loop-with-callbacks.py", line 111, in <module>
loop()
File "loop-with-callbacks.py", line 106, in loop
callback(event_key, event_mask)
File "loop-with-callbacks.py", line 51, in read_response
links = self.parse_links()
File "loop-with-callbacks.py", line 67, in parse_links
raise Exception('parse error')
Exception: parse error

堆栈跟踪只显示事件循环正在运行一个回调函数,我们并不知道什么导致了错误。链条在两端都断了,我们忘记了从哪里来,到哪里去?这种上下文的丢失我们称之为“堆栈翻录”,在许多情况下,它会干扰我们的调试。堆栈翻录还会阻止我们为回调函数链条添加一个异常处理语段,”try / except”封装的函数调用和它的后代树。

所以,除了关于多线程和异步I/O之间有着长时间的争论,还有另一个争论关于哪一种方法跟容易出错。如果你是在同步它们的时候出错,那么多线程更容易导致数据争用,而回调函数则会因为堆栈翻录而更难以调试。

协程

Coroutines

我们曾经许下诺言。可以编写异步代码使得更有效率的回调函数和多线程编程的经典外观结合起来。这种组合是通过一中叫“协程”的模式实现的。使用Python 3.4标准库asyncio,以及另一个叫aiohttp的库,可以很直接的在协程中抓取一个URL。

1
2
3
4
@asyncio.coroutine
def fetch(self, url):
response = yield from self.session.get(url)
body = yield from response.read()

这种方法也是可扩展的。相比每个线程50kb的内存开销和操作系统对线程严格的限制,一个运行在Jesse系统上的Python协程只需要很少的3kb内存。Python可以轻而易举地启动成千上万个协程。

协程的概念,可以追溯到计算机科学发展早期,很简单:协程是一种可以暂停和恢复的子程序。不同于线程是由操作系统管理的抢占式多任务处理,协程是协同时多任务处理:它们自己选择何时暂停,以及接下来运行哪一个协程。

协程的具体实现有很多,即使是Python中也有好几个。Python 3.4标准库”asyncio”中的协程实现是由生成器,Future类,以及”yield from”语句构成。
从Python3.5开始,协程成为Python语言的一个原生特征。然而,理解协程在Python 3.4中使用已经存在的语言工具的实现过程,是使用Python 3.5中原生支持的协程的基础。

为了解释Python 3.4中基于生成器的协程实现,我们将参与一个关于生成器的阐述,以及它们是如何在asynico库中实现协程的,相信你会像我们享受编写的过程一下享受阅读的过程。当我们解释清楚了基于生成器的协程是如何实现的,我们就可以把它用于我们的异步web爬虫中。

Python 生成器的工作原理

How Python Generators Work

在你理解Python的协程之前,必须先明白Python常规函数是如何工作的。通常情况下,当Python函数调用一个子程序,子程序会持有控制权直到子程序返回,或者抛出一个异常。然后控制权交还给它的调用者。

1
2
3
4
5
>>> def foo():
... bar()
...
>>> def bar():
... pass

标准的Python解释器使用C语言编写的,执行一个Python函数的C函数被称为PyEval_EvalFrameEx。它接受一个Python栈帧对象,在栈帧的上下文中评估Python的字节码。这是foo的字节码:

1
2
3
4
5
6
7
>>> import dis
>>> dis.dis(foo)
2 0 LOAD_GLOBAL 0 (bar)
3 CALL_FUNCTION 0 (0 positional, 0 keyword pair)
6 POP_TOP
7 LOAD_CONST 0 (None)
10 RETURN_VALUE

foo函数把bar加载到栈空间中并且调用它,然后从栈中弹出它的返回值,加载None到栈顶,然后返回None

PyEval_EvalFrameEx遇到CALL_FUNCTION字节码时,它创建一个新的Python栈帧并递归:也就是说,它在新的帧上递归地调用PyEval_EvalFrameEx,用于执行bar

理解Python栈帧在堆内存中的分配方式非常重要!Python解释器是一个普通的C程序,所以栈帧也是普通的栈帧。但是它操纵的栈帧在堆上。除去特殊的情况,它意味着Python的栈帧在函数调用结束后依然有效。我们交互式地查看在bar中当前的帧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
>>> import inspect
>>> frame = None
>>> def foo():
... bar()
...
>>> def bar():
... global frame
... frame = inspect.currentframe()
...
>>> foo()
>>> # The frame was executing the code for 'bar'.
>>> frame.f_code.co_name
'bar'
>>> # Its back pointer refers to the frame for 'foo'.
>>> caller_frame = frame.f_back
>>> caller_frame.f_code.co_name
'foo'

这个阶段是为Python生成器设置的,使用相同的构建块——代码对象和堆栈帧,来达到神奇的效果。

这是一个生成器函数:

1
2
3
4
5
6
7
>>> def gen_fn():
... result = yield 1
... print('result of yield: {}'.format(result))
... result2 = yield 2
... print('result of 2nd yield: {}'.format(result2))
... return 'done'
...

当Python执行到gen_fn字节码时,它会看到yield语句然后知道gen_fn是一个生成器函数,而不是普通函数。它设置一个标志来记住这个事实:

1
2
3
4
>>> # The generator flag is bit position 5.
>>> generator_bit = 1 << 5
>>> bool(gen_fn.__code__.co_flags & generator_bit)
True

当你调用一个生成器函数,Python发现生成器标志,然后它不会实际运行那个函数。相反,Python会创建一个生成器:

1
2
3
>>> gen = gen_fn()
>>> type(gen)
<class 'generator'>

Python生成器封装了一个栈帧,并引用了一些代码,即gen_fn的主体

1
2
>>> gen.gi_code.co_name
'gen_fn'

所有由gen_fn调用的生成器指向相同的代码。但是他们有着自己的栈帧。这个栈帧不在任何C函数栈上,它存放在堆内存中等待使用:

这个帧包含一个“最后指令”的指针,指示帧执行的最后一个指令。开始的时候,最后指令指针的值是-1,意味着生成器还没有开始运行:

1
2
>>> gen.gi_frame.f_lasti
-1

当我们调用send,生成器到达它第一个yield,然后暂停。send的返回值是1,这是gen传给yield表达式的内容:

1
2
>>> gen.send(None)
1

生成器指令指针现在是从头开始的3字节码,Python解释生成的代码是总共是56字节。

1
2
3
4
>>> gen.gi_frame.f_lasti
3
>>> len(gen.gi_code.co_code)
56

这个生成器可以在任何时候重启,被任意函数调用,因为它的栈帧并不是真正在栈中:而是在堆中。它在调用链中的位置不是固定的,而且它也不需要遵循普通函数所遵循的先入后出执行顺序。它是自由的,像浮云一样。

我们可以给生成器传递一个值”hello”,它将成为yield表达式的结果,然后生成器继续执行直到它产生2:

1
2
3
>>> gen.send('hello')
result of yield: hello
2

它的栈帧现在包含局部变量result

1
2
>>> gen.gi_frame.f_locals
{'result': 'hello'}

其它从gen_fn创建的生成器会拥有自己的栈帧和局部变量。

当我们再次调用send的时候,生成器会从第二个yield继续执行,然后抛出一个特殊的StopIteration结束:

1
2
3
4
5
>>> gen.send('goodbye')
result of 2nd yield: goodbye
Traceback (most recent call last):
File "<input>", line 1, in <module>
StopIteration: done

这个异常有一个值"done",是生成器的返回值。


第一部分完成,下接:
翻译 | 500 Lines or Less 基于Python协程的网页爬虫 (二)

-------------本文结束 感谢阅读-------------