大白话聊聊 Python 的 concurrent.futures 模块里的 ThreadPoolExecutor

用大白话聊聊 Python 的 concurrent.futures 模块里的 ThreadPoolExecutor,从啥是它到怎么用,简单易懂地一步步讲。

啥是 ThreadPoolExecutor?

想象你在开饭店,生意火爆,点单的人太多,你一个人忙不过来。咋办?雇几个服务员,组成一个“服务员小队”,让他们一起招呼客人。你不用管每个服务员具体干啥,只管把任务(比如“给1号桌送菜”)扔给小队,他们自己会分工干活。ThreadPoolExecutor 就是这个“服务员小队”,它是一个线程池,帮你管理一堆线程,自动分配任务,避免你手动创建和控制每个线程的麻烦。

简单说:它是一个多线程的“任务分发器”,省心又高效。

基础用法:开个小队干活

  1. 最简单的起步

咱先用 ThreadPoolExecutor 跑几个任务:

from concurrent.futures import ThreadPoolExecutor
import time

def worker(name):
    print(f"工人 {name} 开始干活")
    time.sleep(1)  # 假装干活1秒
    return f"工人 {name} 干完了"

# 创建一个线程池
with ThreadPoolExecutor(max_workers=2) as executor:
    # 扔两个任务给线程池
    future1 = executor.submit(worker, "张三")
    future2 = executor.submit(worker, "李四")
    # 等结果
    print(future1.result())
    print(future2.result())

输出:

工人 张三 开始干活
工人 李四 开始干活
(等1秒)
工人 张三 干完了
工人 李四 干完了
  • ThreadPoolExecutor(max_workers=2):开个线程池,最多2个线程(服务员)。
  • executor.submit(函数, 参数):扔一个任务给线程池,返回一个 Future 对象。
  • future.result():等着拿任务结果(阻塞直到任务完成)。
  1. 啥是 Future?

Future 就像你给服务员的任务单,上面写着“啥时候干完我来取结果”。可以用它检查任务状态或拿结果。

中级用法:批量干活

  1. 扔一堆任务

如果任务多,手动一个个 submit 太累,可以用 map:

from concurrent.futures import ThreadPoolExecutor
import time

def worker(name):
    time.sleep(1)
    return f"工人 {name} 干完了"

with ThreadPoolExecutor(max_workers=2) as executor:
    names = ["张三", "李四", "王五"]
    results = executor.map(worker, names)  # 批量扔任务
    for result in results:
        print(result)

输出:

(等1秒)
工人 张三 干完了
工人 李四 干完了
(再等1秒)
工人 王五 干完了
  • executor.map(函数, 可迭代对象):把一堆参数扔给函数,线程池自动分配。
  • 总耗时2秒(2个线程并行干,3个任务分两波)。
  1. 不等结果:异步跑 有时候你不想等着,可以让任务自己跑:
from concurrent.futures import ThreadPoolExecutor
import time

def worker(name):
    time.sleep(1)
    print(f"工人 {name} 干完了")

with ThreadPoolExecutor(max_workers=2) as executor:
    executor.submit(worker, "张三")
    executor.submit(worker, "李四")
    print("我先干别的,不等你们")
  • 不调用 result(),主线程就不用等,任务在后台跑。

高级用法:更灵活地玩

  1. 检查状态和回调

可以用 Future 检查任务状态,或者加个“干完通知我”的回调:

from concurrent.futures import ThreadPoolExecutor
import time

def worker(name):
    time.sleep(1)
    return f"工人 {name} 干完了"

def done_callback(future):
    print("任务完成了,结果是:", future.result())

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(worker, "张三")
    future.add_done_callback(done_callback)  # 加回调
    print("任务状态:", future.done())  # False,还没干完
    time.sleep(2)
    print("任务状态:", future.done())  # True,干完了

输出:

任务状态: False
(等1秒)
任务完成了,结果是: 工人 张三 干完了
(再等1秒)
任务状态: True
  • future.done():检查任务是否完成。
  • adddonecallback(函数):任务一完成就自动调用这个函数。
  1. 处理异常 任务崩了咋办?Future 能捕获异常:
from concurrent.futures import ThreadPoolExecutor

def risky_worker():
    raise ValueError("我崩了!")

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(risky_worker)
    try:
        future.result()  # 拿结果
    except Exception as e:
        print("抓到错误:", e)

输出:

抓到错误: 我崩了!
  • result() 会抛出任务里的异常,方便处理。
  1. 控制线程数 max_workers 是线程池大小,调大调小看需求:
  • 太小(比如1):任务排队,效率低。
  • 太大(比如100):线程切换开销大,可能更慢。 通常设成 CPU 核心数(os.cpu_count())或者任务数的两三倍,具体看情况试。

实际场景:下载文件 假设你有几个文件要下载:

from concurrent.futures import ThreadPoolExecutor
import time

def download_file(url):
    print(f"开始下载 {url}")
    time.sleep(2)  # 模拟下载
    return f"{url} 下载完成"

urls = ["文件1", "文件2", "文件3"]
with ThreadPoolExecutor(max_workers=2) as executor:
    results = executor.map(download_file, urls)
    for result in results:
        print(result)

输出:

开始下载 文件1
开始下载 文件2
(等2秒)
文件1 下载完成
文件2 下载完成
开始下载 文件3
(再等2秒)
文件3 下载完成

2个线程并行,总耗时4秒,比单线程6秒快多了。

总结:ThreadPoolExecutor 的精髓

  • 基础:用 submit 扔单个任务,拿 Future 等结果。
  • 中级:用 map 批量跑任务,或者不等结果异步跑。
  • 高级:加回调、处理异常、调线程数,灵活应对复杂场景。

它比手动 threading.Thread 省事,像个自动化的“任务管家”。适合 CPU 密集任务少(因为 GIL 限制),但 I/O 密集任务(比如下载、读写文件)很香。

文档信息

版权声明:可自由转载(请注明转载出处)-非商用-非衍生

发表时间:2025年3月2日 21:04