【异步爬虫】学习笔记


一、asyncio

  • event_loop:事件循环,相当于一个无限循环,我们可以把一些函数注册到这个事件循环上,当满足条件发生的时候,就会调用对应的处理方法。
  • coroutine:中文翻译叫协程,在 Python 中常指代为协程对象类型,我们可以将协程对象注册到时间循环中,它会被事件循环调用。我们可以使用 - async 关键字来定义一个方法,这个方法在调用时不会立即被执行,而是返回一个协程对象。
  • task:任务,它是对协程对象的进一步封装,包含了任务的各个状态。
  • future:代表将来执行或没有执行的任务的结果,实际上和 task 没有本质区别。

1、定义协程

第一个例子

import asyncio


async def execute(x):
    print('Number:', x)

coroutine = execute(1)
print('After calling execute')
print('Coroutine:', coroutine)
print('*'*40, '分割线', '*'*40)
print('After calling loop')
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)

输出结果为:

After calling execute
Coroutine: <coroutine object execute at 0x000002836EB16C40>
**************************************** 分割线 ****************************************
After calling loop
Number: 1

首先,引入asyncio这个包,这样菜鸟使用asyncawait
然后,使用async定义一个execute()方法,这个方法的功能是,接收一个数字之后,打印出这个数字;
紧接着,尝试直接调用这个方法,但这个方法没有执行,而是返回了一个coroutine协程对象
之后,使用get_event_loop()方法创建一个事件循环loop,并调用loop对象run_until_complete()方法将协程注册到loop中,并启动;
这次之后,就可以看到execute()方法输出的结果。

结论: async定义的方法会编程一个无法执行的coroutine协程对象,必须将其注册到事件循环中才能执行。


一开始,还提到了task,相对于coroutine对象,它多了运行状态,我们可以根据这些状态来获取协程对象的执行情况。

上个例子,当我们将coroutine对象传递给run_until_complete()方法的时候,实际上它进行了一个操作就是将coroutine封装成了task对象

  • 实操验证一下:
import asyncio


async def execute(x):
    print('Number:', x)

coroutine = execute(1)
print('After calling execute')
print('Coroutine:', coroutine)
print('*'*40, '分割线', '*'*40)
print('After calling loop')
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
print('task:', task)
loop.run_until_complete(task)
print('task:', task)
  • 输出结果:
    After calling execute
    Coroutine: <coroutine object execute at 0x000001A58E626B40>
    **************************************** 分割线 ****************************************
    After calling loop
    task: <Task pending name='Task-1' coro=<execute() running at ‘这里是文件路径’:10>>
    Number: 1
    task: <Task finished name='Task-1' coro=<execute() done, defined at ‘这里是文件路径’:10> result=None>
    这里在定义loop对象之后,紧接着调用了它的create_task()方法coroutine对象转化为了task对象,然后打印输出,发现它是pending状态;
    然后,将task对象添加到事件循环中得到执行,紧接着再打印输出,发现它的状态变成了finished,与此同时,还可以发现result变成了1(也就是定义的execute()方法的返回结果)。

直接通过asyncio的ensure_future()方法,不需要借助loop来定义,也可以返回task对象。

  • 实操验证一下:
import asyncio


async def execute(x):
    print('Number:', x)

coroutine = execute(1)
print('After calling execute')
print('Coroutine:', coroutine)
print('*'*40, '分割线', '*'*40)
print('After calling loop')
task = asyncio.ensure_future(coroutine)
print('task:', task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('task:', task)
  • 输出结果:
    After calling execute
    Coroutine: <coroutine object execute at 0x000001A58E626B40>
    **************************************** 分割线 ****************************************
    After calling loop
    task: <Task pending name='Task-1' coro=<execute() running at ‘这里是文件路径’:10>>
    Number: 1
    task: <Task finished name='Task-1' coro=<execute() done, defined at ‘这里是文件路径’:10> result=None>
    和上例的结果是一样的

    2、asyncio的一些方法

  • 官方文档协程与任务

①、asyncio.wait()

可以将一个操作分成多个部分并分开执行,而wait(tasks)可以被用于中断任务集合(tasks)中的某个被事件循环轮询到的任务,直到该协程的其他后台操作完成才被唤醒。

  • 例如:
    import time
    import asyncio
    
    
    async def task_1():
        print('开始运行IO任务1...')
        await asyncio.sleep(3)  # 假设该任务耗时3s
        print('IO任务1已完成,耗时3s')
        return task_1.__name__
    
    
    async def task_2():
        print('开始运行IO任务2...')
        await asyncio.sleep(2)  # 假设该任务耗时2s
        print('IO任务2已完成,耗时2s')
        return task_2.__name__
    
    
    async def main():  # 调用方
        tasks = [task_1(), task_2()]  # 把所有任务添加到task中
        done,pending = await asyncio.wait(tasks)  # 子生成器
        for r in done:  # done和pending都是一个任务,所以返回结果需要逐个调用result()
            print('协程无序返回值:'+r.result())
    
    if __name__ == '__main__':
        start = time.time()
        loop = asyncio.get_event_loop() # 创建一个事件循环对象loop
        try:
            loop.run_until_complete(main()) # 完成事件循环,直到最后一个任务结束
        finally:
            loop.close()  # 结束事件循环
        print('所有IO任务总耗时%.5f秒' % float(time.time()-start))
  • 输出结果:
开始运行IO任务1...
开始运行IO任务2...
IO任务2已完成,耗时2s
IO任务1已完成,耗时3s
协程无序返回值:task_2
协程无序返回值:task_1
所有IO任务总耗时3.00769
  • 其中:
done, pending = await asyncio.wait(aws)

此处并发运行传入的aws(awaitable objects),同时通过await返回一个包含(done, pending)的元组,done表示已完成的任务列表,pending表示未完成的任务列表。
注:
①只有当给wait()传入timeout参数时才有可能产生pending列表。
②通过wait()返回的结果集是按照事件循环中的任务完成顺序排列的,所以其往往和原始任务顺序不同。

②、asyncio.gather()

如果只关心协程并发运行后的结果集合,可以使用gather(),它不仅通过await返回仅一个结果集,而且这个结果集的结果顺序是传入任务的原始顺序。

  • 例如:
import time
import asyncio
async def taskIO_1():
    print('开始运行IO任务1...')
    await asyncio.sleep(3)  # 假设该任务耗时3s
    print('IO任务1已完成,耗时3s')
    return taskIO_1.__name__
async def taskIO_2():
    print('开始运行IO任务2...')
    await asyncio.sleep(2)  # 假设该任务耗时2s
    print('IO任务2已完成,耗时2s')
    return taskIO_2.__name__
async def main(): # 调用方
    resualts = await asyncio.gather(taskIO_1(), taskIO_2()) # 子生成器
    print(resualts)

if __name__ == '__main__':
    start = time.time()
    loop = asyncio.get_event_loop() # 创建一个事件循环对象loop
    try:
        loop.run_until_complete(main()) # 完成事件循环,直到最后一个任务结束
    finally:
        loop.close() # 结束事件循环
    print('所有IO任务总耗时%.5f秒' % float(time.time()-start))
  • 结果为:
开始运行IO任务1...
开始运行IO任务2...
IO任务2已完成,耗时2s
IO任务1已完成,耗时3s
['taskIO_1', 'taskIO_2']
所有IO任务总耗时3.00936

gather()通过await直接返回一个结果集列表,我们可以清晰的从执行结果看出来,虽然任务2是先完成的,但最后返回的结果集的顺序是按照初始传入的任务顺序排的。

③、asyncio.as_completed()

as_completed(tasks)是一个生成器,它管理着一个协程列表(此处是传入的tasks)的运行。当任务集合中的某个任务率先执行完毕时,会率先通过await关键字返回该任务结果。可见其返回结果的顺序和wait()一样,均是按照完成任务顺序排列的。

  • 例如:
import time
import asyncio
async def taskIO_1():
    print('开始运行IO任务1...')
    await asyncio.sleep(3)  # 假设该任务耗时3s
    print('IO任务1已完成,耗时3s')
    return taskIO_1.__name__
async def taskIO_2():
    print('开始运行IO任务2...')
    await asyncio.sleep(2)  # 假设该任务耗时2s
    print('IO任务2已完成,耗时2s')
    return taskIO_2.__name__
async def main(): # 调用方
    tasks = [taskIO_1(), taskIO_2()]  # 把所有任务添加到task中
    for completed_task in asyncio.as_completed(tasks):
        resualt = await completed_task # 子生成器
        print('协程无序返回值:'+resualt)

if __name__ == '__main__':
    start = time.time()
    loop = asyncio.get_event_loop() # 创建一个事件循环对象loop
    try:
        loop.run_until_complete(main()) # 完成事件循环,直到最后一个任务结束
    finally:
        loop.close() # 结束事件循环
    print('所有IO任务总耗时%.5f秒' % float(time.time()-start))
  • 结果为:
开始运行IO任务2...
开始运行IO任务1...
IO任务2已完成,耗时2s
协程无序返回值:taskIO_2
IO任务1已完成,耗时3s
协程无序返回值:taskIO_1
所有IO任务总耗时3.00300

从上面的程序可以看出,使用as_completed(tasks)wait(tasks)相同之处是返回结果的顺序是协程的完成顺序,这与gather()恰好相反。而不同之处是as_completed(tasks)可以实时返回当前完成的结果,而wait(tasks)需要等待所有协程结束后返回的done去获得结果。

二、aiohttp


1、多线程与异步的区别


异步爬虫不同于多进程爬虫,它使用单线程(即仅创建一个事件循环,然后把所有任务添加到事件循环中)就能并发处理多任务。在轮询到某个任务后,当遇到耗时操作(如请求URL)时,挂起该任务并进行下一个任务,当之前被挂起的任务更新了状态(如获得了网页响应),则被唤醒,程序继续从上次挂起的地方运行下去。极大的减少了中间不必要的等待时间。


2、aiohttp安装

  • 原因
    aiohttp库,用来实现异步网页请求等功能,相当于异步版的requests
  • 安装
pip3 install aiohttp

3、ClientSession

在协程中使用ClientSession()get()request()方法来请求网页。(其中async with异步上下文管理器,其封装了异步实现等功能)

  • 例如:
import aiohttp
import asyncio


async def get_text():
    async with aiohttp.ClientSession() as session:
        async with session.get('http://httpbin.org/get') as resp:
            print(resp.status)
            print(await resp.text())


def main():
    loop = asyncio.get_event_loop()
    task = get_text()
    loop.run_until_complete(task)
    loop.close()


if __name__ == '__main__':
     main()
  • 输出结果为:
200
{
  "args": {}, 
  "headers": {
    "Accept": "*/*", 
    "Accept-Encoding": "gzip, deflate", 
    "Host": "httpbin.org", 
    "User-Agent": "Python/3.8 aiohttp/3.8.1", 
    "X-Amzn-Trace-Id": "Root=1-6216f458-66a3ef0733ff064f527672f8"
  }, 
  "origin": "36.153.167.77", 
  "url": "http://httpbin.org/get"
}
  • 还可以这样使用:
session.request(method='GET', url='http://httpbin.org/request')

三、参考文章

Python 中异步协程的使用方法介绍

Python异步IO之协程(二):使用asyncio的不同方法实现协程

Python实战异步爬虫(协程)+分布式爬虫(多进程)


Author: Polaris119
Reprint policy: All articles in this blog are used except for special statements CC BY 4.0 reprint polocy. If reproduced, please indicate source Polaris119 !
评论
  TOC