大白话聊聊 Python 的 concurrent.futures 模块里的 ThreadPoolExecutor
用大白话聊聊 Python 的 concurrent.futures 模块里的 ThreadPoolExecutor,从啥是它到怎么用,简单易懂地一步步讲。
啥是 ThreadPoolExecutor?
想象你在开饭店,生意火爆,点单的人太多,你一个人忙不过来。咋办?雇几个服务员,组成一个“服务员小队”,让他们一起招呼客人。你不用管每个服务员具体干啥,只管把任务(比如“给1号桌送菜”)扔给小队,他们自己会分工干活。ThreadPoolExecutor 就是这个“服务员小队”,它是一个线程池,帮你管理一堆线程,自动分配任务,避免你手动创建和控制每个线程的麻烦。
简单说:它是一个多线程的“任务分发器”,省心又高效。
基础用法:开个小队干活
- 最简单的起步
咱先用 ThreadPoolExecutor 跑几个任务:
from concurrent.futures import ThreadPoolExecutor
import time
def worker(name):
print(f"工人 {name} 开始干活")
time.sleep(1) # 假装干活1秒
return f"工人 {name} 干完了"
# 创建一个线程池
with ThreadPoolExecutor(max_workers=2) as executor:
# 扔两个任务给线程池
future1 = executor.submit(worker, "张三")
future2 = executor.submit(worker, "李四")
# 等结果
print(future1.result())
print(future2.result())
输出:
工人 张三 开始干活
工人 李四 开始干活
(等1秒)
工人 张三 干完了
工人 李四 干完了
- ThreadPoolExecutor(max_workers=2):开个线程池,最多2个线程(服务员)。
- executor.submit(函数, 参数):扔一个任务给线程池,返回一个 Future 对象。
- future.result():等着拿任务结果(阻塞直到任务完成)。
- 啥是 Future?
Future 就像你给服务员的任务单,上面写着“啥时候干完我来取结果”。可以用它检查任务状态或拿结果。
中级用法:批量干活
- 扔一堆任务
如果任务多,手动一个个 submit 太累,可以用 map:
from concurrent.futures import ThreadPoolExecutor
import time
def worker(name):
time.sleep(1)
return f"工人 {name} 干完了"
with ThreadPoolExecutor(max_workers=2) as executor:
names = ["张三", "李四", "王五"]
results = executor.map(worker, names) # 批量扔任务
for result in results:
print(result)
输出:
(等1秒)
工人 张三 干完了
工人 李四 干完了
(再等1秒)
工人 王五 干完了
- executor.map(函数, 可迭代对象):把一堆参数扔给函数,线程池自动分配。
- 总耗时2秒(2个线程并行干,3个任务分两波)。
- 不等结果:异步跑 有时候你不想等着,可以让任务自己跑:
from concurrent.futures import ThreadPoolExecutor
import time
def worker(name):
time.sleep(1)
print(f"工人 {name} 干完了")
with ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(worker, "张三")
executor.submit(worker, "李四")
print("我先干别的,不等你们")
- 不调用 result(),主线程就不用等,任务在后台跑。
高级用法:更灵活地玩
- 检查状态和回调
可以用 Future 检查任务状态,或者加个“干完通知我”的回调:
from concurrent.futures import ThreadPoolExecutor
import time
def worker(name):
time.sleep(1)
return f"工人 {name} 干完了"
def done_callback(future):
print("任务完成了,结果是:", future.result())
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(worker, "张三")
future.add_done_callback(done_callback) # 加回调
print("任务状态:", future.done()) # False,还没干完
time.sleep(2)
print("任务状态:", future.done()) # True,干完了
输出:
任务状态: False
(等1秒)
任务完成了,结果是: 工人 张三 干完了
(再等1秒)
任务状态: True
- future.done():检查任务是否完成。
- adddonecallback(函数):任务一完成就自动调用这个函数。
- 处理异常 任务崩了咋办?Future 能捕获异常:
from concurrent.futures import ThreadPoolExecutor
def risky_worker():
raise ValueError("我崩了!")
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(risky_worker)
try:
future.result() # 拿结果
except Exception as e:
print("抓到错误:", e)
输出:
抓到错误: 我崩了!
- result() 会抛出任务里的异常,方便处理。
- 控制线程数 max_workers 是线程池大小,调大调小看需求:
- 太小(比如1):任务排队,效率低。
- 太大(比如100):线程切换开销大,可能更慢。 通常设成 CPU 核心数(os.cpu_count())或者任务数的两三倍,具体看情况试。
实际场景:下载文件 假设你有几个文件要下载:
from concurrent.futures import ThreadPoolExecutor
import time
def download_file(url):
print(f"开始下载 {url}")
time.sleep(2) # 模拟下载
return f"{url} 下载完成"
urls = ["文件1", "文件2", "文件3"]
with ThreadPoolExecutor(max_workers=2) as executor:
results = executor.map(download_file, urls)
for result in results:
print(result)
输出:
开始下载 文件1
开始下载 文件2
(等2秒)
文件1 下载完成
文件2 下载完成
开始下载 文件3
(再等2秒)
文件3 下载完成
2个线程并行,总耗时4秒,比单线程6秒快多了。
总结:ThreadPoolExecutor 的精髓
- 基础:用 submit 扔单个任务,拿 Future 等结果。
- 中级:用 map 批量跑任务,或者不等结果异步跑。
- 高级:加回调、处理异常、调线程数,灵活应对复杂场景。
它比手动 threading.Thread 省事,像个自动化的“任务管家”。适合 CPU 密集任务少(因为 GIL 限制),但 I/O 密集任务(比如下载、读写文件)很香。
文档信息
版权声明:可自由转载(请注明转载出处)-非商用-非衍生
发表时间:2025年3月2日 21:04