Python异步循环嵌套常见问题根源与避坑指南
先跟你聊个真实案例。
上个月接到一个任务:写一个爬虫,需要抓取一万个页面。每个页面内嵌几十张图片链接,还得一并下载。
听起来很简单对吧?直接用 requests 循环一万次,再内嵌几十次循环,搞定。
但转念一想,根本行不通。一万个页面,每个页面几十张图,累计几十万次网络请求。如果同步一个一个等,估计跑完时新版都要上线了。
那就上异步。asyncio、aiohttp 全部安排。
代码写完,一跑——慢得离谱。跟同步几乎没区别,异步优势完全没体现出来。
让人抓狂的是,问题到底出在哪里?
熬了一整夜,翻了几百篇帖子,最后才发现罪魁祸首藏在一个极其不起眼的细节里。
今天把这个坑完整拆解给你。保证听完之后,不止知道怎么避坑,还能真正掌握异步循环嵌套的核心逻辑。
先搭一个场景
咱们做一个精简示例。假设从三个网站抓数据,每个网站需要先请求 page 接口(耗时1秒),再依据返回结果请求 detail 接口(同样耗时1秒)。
同步写法很直接:
import time
def fetch_page(site):
time.sleep(1) # 模拟网络延迟
return f"{site} 的数据"
def fetch_detail(site):
time.sleep(1)
return f"{site} 的详细信息"
def main():
sites = ["site_a", "site_b", "site_c"]
for site in sites:
page = fetch_page(site)
detail = fetch_detail(site)
print(page, detail)
start = time.time()
main()
print(f"耗时: {time.time() - start:.2f}秒")
跑一下,耗时约6秒。每个站点2秒,三个站点恰好6秒。符合预期。
那么异步版本呢?理想情况下三个站点的请求可以同时进行,总耗时只需2秒左右。
编写异步版本:
import asyncio
async def fetch_page(site):
await asyncio.sleep(1)
return f"{site} 的数据"
async def fetch_detail(site):
await asyncio.sleep(1)
return f"{site} 的详细信息"
async def process_site(site):
page = await fetch_page(site)
detail = await fetch_detail(site)
return page, detail
async def main():
sites = ["site_a", "site_b", "site_c"]
tasks = [process_site(site) for site in sites]
results = await asyncio.gather(*tasks)
for result in results:
print(result)
start = time.time()
asyncio.run(main())
print(f"耗时: {time.time() - start:.2f}秒")
这个版本耗时多少?2秒左右,完美。
看起来很简单,对吧?但就是在这个基础上,稍加一层循环嵌套,问题立刻就暴露了。
真实代码的结构
当时的代码大概长这样:
async def fetch_page(site, page_num):
await asyncio.sleep(0.1) # 模拟页面请求
return f"{site} 第{page_num}页的数据"
async def fetch_images(page_data):
await asyncio.sleep(0.05) # 模拟图片请求
return [f"image_{i}" for i in range(3)]
async def process_site(site):
all_images = []
# 外层循环:遍历当前站点的每一页
for page_num in range(1, 11): # 假设每个站点有10页
page_data = await fetch_page(site, page_num)
# 内层循环:处理该页的所有图片
images = await fetch_images(page_data)
all_images.extend(images)
return all_images
async def main():
sites = ["site_a", "site_b", "site_c"]
tasks = [process_site(site) for site in sites]
results = await asyncio.gather(*tasks)
乍一看没毛病?外层循环是站点,内层循环是站点内的每一页,每页里的图片又用异步请求。听起来不错。
但实际跑起来,三个站点之间确实是并发的,可每个站点内部的10页是顺序执行的——先请求第1页,等返回后,再请求第1页的图片,然后才能开始第2页,再等图片,再第3页……
相当于三个站点各自排成一队,一页一页地处理。完全浪费了“一页内多张图片可以同时下载”的优化机会。
更严重的是,如果每个站点有100页,每页50张图,这个顺序执行的瓶颈会被放大100倍。
问题非常隐蔽,不画执行顺序根本看不出来。直到在纸上把时序图理清,才恍然大悟。
画出执行顺序你就懂了
咱们手动画一下执行过程。假设只有两个站点,每个站点两页,每页两张图。
当初那种代码的执行顺序:
站点A:请求第1页 → 等待 → 拿到数据 → 请求图1 → 等待 → 请求图2 → 等待 → 存图
站点A:请求第2页 → 等待 → 拿到数据 → 请求图1 → 等待 → 请求图2 → 等待 → 存图
站点B:请求第1页 → 等待 → 拿到数据 → 请求图1 → 等待 → 请求图2 → 等待 → 存图
站点B:请求第2页 → 等待 → 拿到数据 → 请求图1 → 等待 → 请求图2 → 等待 → 存图
每次“等待”时,CPU其实都空闲,但程序就是不去处理其他任务,非要等当前请求返回。
这就是问题核心:await 会挂起当前异步函数,但只挂起自己,不会影响同一层次的其他任务。
换句大白话说:
当你在一个异步函数里写 await something(),这个函数会停在这里,等 something() 完成。但这不意味着整个程序停止——程序可以切换到其他异步任务,比如另一个站点的任务。
所以在上面的代码中,process_site('site_a') 在等待第1页返回时,程序确实可以去处理 process_site('site_b')。这一点没问题,因此三个站点之间有并发。
问题在于:同一个 process_site 任务内部,for 循环里的每一次 await 都会让这个任务停顿,直到这次请求完成,才能进入下一次循环。内层循环同理。
所以每个站点内部的所有请求都是串行的。
正确的做法是什么?
如果你想在一个站点内部也实现并发,就需要把独立的请求批量收集起来,然后用 asyncio.gather 或 asyncio.wait 一次性发出。
以图片下载为例,正确做法是:先收集该页的所有图片链接,然后一次性创建所有图片的异步任务,同时等待全部完成。
代码大致如下:
async def process_site_correct(site):
all_images = []
for page_num in range(1, 11):
page_data = await fetch_page(site, page_num)
# 先提取本页所有图片链接
image_urls = extract_image_urls(page_data)
# 关键在这里:一次性创建所有图片任务,并发执行
image_tasks = [fetch_image(url) for url in image_urls]
images = await asyncio.gather(*image_tasks)
all_images.extend(images)
return all_images
这样修改后,执行顺序变为:
站点A:请求第1页 → 等待
(等待期间,站点B可以执行自己的任务)
第1页返回 → 同时请求该页的所有图片(假设10张图同时发送)
等待所有图片返回 → 然后继续第2页
图片下载部分从串行变成了并发。
但这里还有优化空间:页面请求本身能不能也并发?比如一个站点有10页,能不能同时请求这些页面?
可以。但需要注意:同时请求10页可能对目标服务器造成压力,也可能让自己的网络连接数爆满。合理控制并发数是另一个话题,今天不展开。
更隐蔽的坑:嵌套循环里的 gather
再深入一层。假设每个站点的每一页,返回的数据里不仅包含图片链接,还需要额外调用其他 API(比如每张图片要请求一个评论接口)。
此时代码可能变成这样:
async def fetch_image_with_comments(image_url):
image_data = await fetch_image(image_url)
comments = await fetch_comments(image_url)
return {"image": image_data, "comments": comments}
async def process_page(page_num):
page_data = await fetch_page(page_num)
image_urls = extract_urls(page_data)
# 这里看起来是并发的
tasks = [fetch_image_with_comments(url) for url in image_urls]
results = await asyncio.gather(*tasks)
return results
看上去没问题吧?每个 fetch_image_with_comments 内部是串行的(先等图,再等评论),但不同图片之间是并发的。
这已经相当好了。
但如果你写出这样的代码:
# 错误示范
async def fetch_image_with_comments_wrong(image_url):
# 里面又套了一层循环?或者用了 gather 但忘了 await?
tasks = [fetch_image(image_url), fetch_comments(image_url)]
# 此处没有 await,返回的是一个协程对象,不是结果
return asyncio.gather(*tasks) # 注意:这里没有 await
你会发现结果不对,更糟糕的是程序根本没执行这些请求,因为你返回的是尚未调度的协程对象。
这属于另一个经典错误:asyncio.gather 返回的是一个 awaitable 对象,必须 await 它,或者用 asyncio.run 执行,否则不会真正运行。
调试方法:打日志看时间
如果不确定自己的异步代码是否真的在并发,最简单的方法就是打时间戳。
import time
async def fetch_with_log(name, delay):
start = time.time()
print(f"[{start:.3f}] 开始 {name}")
await asyncio.sleep(delay)
end = time.time()
print(f"[{end:.3f}] 结束 {name},耗时 {end-start:.2f}秒")
return name
async def test_serial():
print("串行版本:")
for i in range(3):
await fetch_with_log(f"任务{i}", 0.5)
async def test_concurrent():
print("并发版本:")
tasks = [fetch_with_log(f"任务{i}", 0.5) for i in range(3)]
await asyncio.gather(*tasks)
# 跑一下就能看到区别
# 串行:开始时间依次相差0.5秒
# 并发:三个任务的开始时间几乎相同
这个技巧我用过无数遍。每当你怀疑某个循环是否串行时,把关键操作加上日志,观察开始时间是否挤在一起。
如果开始时间呈串联状,那就是串行;如果几乎同时打印,那就是并发。
几条简单规则
根据经验,归纳几条实用规则:
规则1:看到 await 在循环里,立刻警惕
for 循环内部直接 await 一个异步函数,这个循环必定是串行的。除非你刻意要串行,否则应考虑先收集任务再用 gather。
规则2:明确“谁和谁可以并发”
不同站点之间:可以并发
同一站点的不同页面:如果服务器能承受,可以并发
同一页面里的不同图片:可以并发
同一张图的下载和评论请求:通常不能并发(存在依赖关系)
规则3:gather 不是万能药,它只是“同时等待”
很多人以为用了 gather 就自动并发。其实 gather 只做一件事:把传入的多个协程同时调度,然后等待所有完成。前提是这些任务本身互相独立。
如果你传给 gather 一堆 [fetch_page(1), fetch_page(2), fetch_page(3)],三个请求会同时发出,效果很好。
但如果传的是 [process_page(1), process_page(2), process_page(3)],而每个 process_page 内部又是串行的,那么 gather 也无济于事。
规则4:异步不等于自动并行
这是最容易误解的一点。async/await 只提供了“等待时不阻塞”的能力,而不是“自动将循环拆分成多线程”。并发需要你显式使用 gather、create_task、wait 等工具来组织。
回到那个爬虫
最终那个爬虫改成了这样:
async def process_site_optimized(site):
# 先获取该站点所有需要抓取的页面列表
page_tasks = [fetch_page(site, page_num) for page_num in range(1, 101)]
# 同时请求所有页面(用 semaphore 限制并发数)
pages_data = await limited_gather(page_tasks, max_concurrent=10)
# 收集所有图片 URL
all_image_tasks = []
for page_data in pages_data:
image_urls = extract_image_urls(page_data)
all_image_tasks.extend([fetch_image(url) for url in image_urls])
# 同时下载所有图片(同样限制并发)
images = await limited_gather(all_image_tasks, max_concurrent=20)
return images
这里的 limited_gather 是自己封装的一个函数,利用 asyncio.Semaphore 控制同时进行的请求数量。这样既发挥了异步并发的优势,又不会打爆服务器或耗尽连接池。
改完之后,原本需要20分钟的任务,1分多钟就完成了。
最后说几句
异步编程的难点不在于 async/await 这两个关键字,而在于思维模式的转换。
在同步编程中,你写 for 循环,脑子里的逻辑是“一个一个来”。在异步编程中,你需要思考“哪些事情可以同时做,哪些事情必须等”。
当你看到嵌套循环时,不要急着写代码。先在纸上画一下:外层循环的每一次迭代,是否依赖上次结果?内层循环的每一次迭代,是否互相依赖?
如果不依赖,那它们就可以并发。并发的方式就是把所有任务收集到一个列表里,然后一次性 await gather。
原理就这么简单。
可就是这件简单的事,让很多人加班到凌晨。
希望你看完这篇文章后,再也不用那样熬夜了。
