2014-10-04
requests库是python一个优秀的HTTP库,使用它可以非常简单地执行HTTP的各种操作,例如GET、POST等。不过,这个库所执行的网络请求都是同步了,即cpu发出请求指令后,IO执行发送和等待等操作,在这段IO执行的时间里,cpu什么也不做,这样cpu的计算能力就被浪费了。所以,可以尝试把网络请求修改为异步的,也就是在IO发挥作用的这段时间,CPU去做这个程序里的其他事情,等IO收到响应的数据,CPU回来处理。下面介绍一下如何将requests变成一个异步HTTP库。
requests
安装:
sudo pip install requests
一个获取页面的示例(test01.py):
# -*- coding: utf-8 -*-
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
import requests
r = requests.get('http://letiantian.xyz/')
print r.text
如果网络没问题,会获取http://letiantian.xyz/
页面的html代码。
gevent
gevent is a coroutine-based Python networking library that uses greenlet to provide a high-level synchronous API on top of the libev event loop.
话是这样说,gevent的效果是异步的。
安装:
sudo pip install gevent
不使用gevent,test02.py代码为:
# -*- coding: utf-8 -*-
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
import urllib2
urls = ["http://letiantian.xyz/"] *10
def get_content(url):
data = urllib2.urlopen(url).read()
return data
for url in urls:
get_content(url)
使用gevent,test03.py代码为:
# -*- coding: utf-8 -*-
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
import gevent
from gevent import monkey
monkey.patch_socket()
import urllib2
urls = ["http://letiantian.xyz/"] *10
def get_content(url):
data = urllib2.urlopen(url).read()
return data
jobs = [gevent.spawn(get_content, url) for url in urls]
gevent.joinall(jobs)
# print jobs[0].value
比较两者的运行时间:
zsh >> time python test02.py
python test02.py 0.05s user 0.02s system 0% cpu 7.362 total
zsh >> time python test03.py
python test03.py 0.06s user 0.01s system 3% cpu 2.100 total
可见,使用gevent时,cpu利用率较高,且程序速度也较快(3倍速度,也可能更快)。
grequests库:使用gevent封装requests
安装:
sudo pip install grequests
使用grequests库,test04.py代码如下:
# -*- coding: utf-8 -*-
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
import grequests
urls = ["http://letiantian.xyz/"] *10
reqs = [grequests.get(url) for url in urls ]
response = grequests.map(reqs)
response0 = response[0]
# print response0.text
运行时间:
zsh >> time python test04.py
python test04.py 0.13s user 0.06s system 10% cpu 1.698 total
grequests库存放在https://github.com/kennethreitz/grequests,代码很少,很容易看懂。
下面结合grequests的源码分析一下test04.py。 grequests.get(url)
其实就是grequests.AsyncRequest('GET', url)
,grequests中是使用了functools.partial
对此进行了转换。 类AsyncRequest
的__init__
函数做了一些初始化函数,self.url就是初始化该对象时给出的url,self.session是一个requests.Session()对象。 类AsyncRequest
有一个send()方法,该方法就是调用self.session的request方法,将请求结果放入self.response中。 grequests
下也有一个send方法,其内容是:
def send(r, pool=None, stream=False):
if pool != None:
return pool.spawn(r.send, stream=stream)
return gevent.spawn(r.send, stream=stream)
pool是gevent.pool.Pool的一个实例,用来管理一组greenlet。无论是pool.spawn还是gevent.spawn都是负责执行异步请求。
上面的说通了,那么grequests下的map函数也好理解了。
def map(requests, stream=False, size=None, exception_handler=None):
requests = list(requests)
pool = Pool(size) if size else None
jobs = [send(r, pool, stream=stream) for r in requests]
gevent.joinall(jobs)
ret = []
for request in requests:
if request.response:
ret.append(request.response)
elif exception_handler:
exception_handler(request, request.exception)
return ret
后面贴了grequests的源码。
requests-futures库
requests-futures库也是将requests库封装成了一个异步库,不过使用的是python3中的新特性(在python2中也能使用,但需要额外安装futures库),实现得也很简单,此处就不介绍了。
参考
http://www.gevent.org/ https://github.com/kennethreitz/grequests http://www.gevent.org/gevent.monkey.html#gevent.monkey.patch_all https://github.com/ross/requests-futures https://pypi.python.org/pypi/futures http://www.2cto.com/kf/201308/232824.html http://www.gevent.org/gevent.pool.html
grequests的源码
# -*- coding: utf-8 -*-
"""
grequests
~~~~~~~~~
This module contains an asynchronous replica of ``requests.api``, powered
by gevent. All API methods return a ``Request`` instance (as opposed to
``Response``). A list of requests can be sent with ``map()``.
"""
from functools import partial
try:
import gevent
from gevent import monkey as curious_george
from gevent.pool import Pool
except ImportError:
raise RuntimeError('Gevent is required for grequests.')
# Monkey-patch.
curious_george.patch_all(thread=False, select=False)
from requests import Session
__all__ = (
'map', 'imap',
'get', 'options', 'head', 'post', 'put', 'patch', 'delete', 'request'
)
class AsyncRequest(object):
""" Asynchronous request.
Accept same parameters as ``Session.request`` and some additional:
:param session: Session which will do request
:param callback: Callback called on response.
Same as passing ``hooks={'response': callback}``
"""
def __init__(self, method, url, **kwargs):
#: Request method
self.method = method
#: URL to request
self.url = url
#: Associated ``Session``
self.session = kwargs.pop('session', None)
if self.session is None:
self.session = Session()
callback = kwargs.pop('callback', None)
if callback:
kwargs['hooks'] = {'response': callback}
#: The rest arguments for ``Session.request``
self.kwargs = kwargs
#: Resulting ``Response``
self.response = None
def send(self, **kwargs):
"""
Prepares request based on parameter passed to constructor and optional ``kwargs```.
Then sends request and saves response to :attr:`response`
:returns: ``Response``
"""
merged_kwargs = {}
merged_kwargs.update(self.kwargs)
merged_kwargs.update(kwargs)
try:
self.response = self.session.request(self.method,
self.url, **merged_kwargs)
except Exception as e:
self.exception = e
return self
def send(r, pool=None, stream=False):
"""Sends the request object using the specified pool. If a pool isn't
specified this method blocks. Pools are useful because you can specify size
and can hence limit concurrency."""
if pool != None:
return pool.spawn(r.send, stream=stream)
return gevent.spawn(r.send, stream=stream)
# Shortcuts for creating AsyncRequest with appropriate HTTP method
get = partial(AsyncRequest, 'GET')
options = partial(AsyncRequest, 'OPTIONS')
head = partial(AsyncRequest, 'HEAD')
post = partial(AsyncRequest, 'POST')
put = partial(AsyncRequest, 'PUT')
patch = partial(AsyncRequest, 'PATCH')
delete = partial(AsyncRequest, 'DELETE')
# synonym
def request(method, url, **kwargs):
return AsyncRequest(method, url, **kwargs)
def map(requests, stream=False, size=None, exception_handler=None):
"""Concurrently converts a list of Requests to Responses.
:param requests: a collection of Request objects.
:param stream: If True, the content will not be downloaded immediately.
:param size: Specifies the number of requests to make at a time. If None, no throttling occurs.
:param exception_handler: Callback function, called when exception occured. Params: Request, Exception
"""
requests = list(requests)
pool = Pool(size) if size else None
jobs = [send(r, pool, stream=stream) for r in requests]
gevent.joinall(jobs)
ret = []
for request in requests:
if request.response:
ret.append(request.response)
elif exception_handler:
exception_handler(request, request.exception)
return ret
def imap(requests, stream=False, size=2, exception_handler=None):
"""Concurrently converts a generator object of Requests to
a generator of Responses.
:param requests: a generator of Request objects.
:param stream: If True, the content will not be downloaded immediately.
:param size: Specifies the number of requests to make at a time. default is 2
:param exception_handler: Callback function, called when exception occured. Params: Request, Exception
"""
pool = Pool(size)
def send(r):
return r.send(stream=stream)
for request in pool.imap_unordered(send, requests):
if request.response:
yield request.response
elif exception_handler:
exception_handler(request, request.exception)
pool.join()