# CPU 密集型:4 线程 vs 单线程,几乎没差别 defcpu_task(): returnsum(i * i for i inrange(1_000_000))
start = time.perf_counter() threads = [threading.Thread(target=cpu_task) for _ inrange(4)] for t in threads: t.start() for t in threads: t.join() print(f"4 threads CPU: {time.perf_counter() - start:.2f}s") # ~2.5s (无加速)
start = time.perf_counter() threads = [threading.Thread(target=io_task) for _ inrange(4)] for t in threads: t.start() for t in threads: t.join() print(f"4 threads IO: {time.perf_counter() - start:.2f}s") # ~0.1s (4x 加速)
二、threading 模块:底层同步原语
2.1 核心同步原语一览
flowchart TD
A["🎯 同步需求"] --> B{"选择什么?"}
B -->|"原子操作<br/>互斥"| C["🔒 Lock"]
B -->|"可重入<br/>递归"| D["🔄 RLock"]
B -->|"等待条件<br/>信号"| E["⏳ Condition"]
B -->|"资源池<br/>限流"| F["🚦 Semaphore"]
B -->|"事件触发<br/>一次性"| G["📡 Event"]
B -->|"生产者<br/>消费者"| H["📬 Queue"]
C --> I["threading.Lock()"]
D --> J["threading.RLock()"]
E --> K["threading.Condition()"]
F --> L["threading.Semaphore(n)"]
G --> M["threading.Event()"]
H --> N["queue.Queue(maxsize)"]
style A fill:#FFB3C6,stroke:#F48FB1,color:#333
style B fill:#FFF9C4,stroke:#F9A825,color:#333
style I fill:#C7CEEA,stroke:#9FA8DA,color:#333
style J fill:#C7CEEA,stroke:#9FA8DA,color:#333
style K fill:#B5EAD7,stroke:#80CBC4,color:#333
style L fill:#FFDAB9,stroke:#FFAB76,color:#333
style M fill:#E8D5F5,stroke:#CE93D8,color:#333
style N fill:#B5EAD7,stroke:#80CBC4,color:#333
2.2 Lock:保护临界区
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
import threading
counter = 0 lock = threading.Lock()
defincrement(): global counter for _ inrange(100_000): with lock: # 自动获取/释放锁 counter += 1
threads = [threading.Thread(target=increment) for _ inrange(4)] for t in threads: t.start() for t in threads: t.join() print(f"Counter: {counter}") # 400000(正确) # 不加锁结果会小于 400000(race condition)
# 使用示例 caller = AsyncLLMCaller(max_workers=3) result = caller.call("What is AI?", timeout=10) print(result)
2.5 Semaphore:信号量
控制同时访问资源的线程数量(如控制数据库连接数):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
import threading import time
# 限制同时只能有 3 个线程访问 API semaphore = threading.Semaphore(3)
defcall_api(i): with semaphore: print(f"Thread {i} acquired semaphore") time.sleep(1) # 模拟 API 调用 print(f"Thread {i} released semaphore")
threads = [threading.Thread(target=call_api, args=(i,)) for i inrange(10)] for t in threads: t.start() for t in threads: t.join() # 10 个任务,每批 3 个,每批耗时 1s,总共 ~4s
# 使用 5 个工作线程的线程池 with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # submit + as_completed:按完成顺序获取结果 futures = {executor.submit(fetch_data, url): url for url in urls} for future in concurrent.futures.as_completed(futures): result = future.result() print(f"Got: {result['url']}")
# 耗时:20 * 0.1s / 5 = 0.4s(vs 2.0s 顺序执行)
3.2 ProcessPoolExecutor:CPU 密集型的多进程
绕过 GIL,利用多核 CPU:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
import concurrent.futures
defcpu_bound_task(n: int) -> int: """CPU 密集型:计算平方和""" returnsum(i * i for i inrange(n))
# 使用 4 个进程 with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor: futures = [ executor.submit(cpu_bound_task, 1_000_000) for _ inrange(8) ] results = [f.result() for f in concurrent.futures.as_completed(futures)] print(f"Got {len(results)} CPU results") print(f"Sum of squares: {sum(results)}")
3.3 executor.map:简洁的任务提交
1 2 3 4 5 6
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # map 保持原始顺序,结果按提交顺序返回 results = executor.map(fetch_data, urls) for result in results: print(result)
四、决策表:何时用线程、进程、还是 asyncio?
flowchart TD
A["任务类型判断"] --> B{"主要特征?"}
B -->|"等待 IO<br/>网络/磁盘"| C["🧵 线程池<br/>ThreadPoolExecutor"]
B -->|"大量 CPU 计算<br/>数据处理"| D["⚙️ 进程池<br/>ProcessPoolExecutor"]
B -->|"高并发 IO<br/>万级别连接"| E["⚡ asyncio<br/>单线程异步"]
B -->|"混合任务<br/>CPU + IO"| F["🔄 进程池 + asyncio<br/>分层架构"]
style A fill:#FFB3C6,stroke:#F48FB1,color:#333
style B fill:#FFF9C4,stroke:#F9A825,color:#333
style C fill:#C7CEEA,stroke:#9FA8DA,color:#333
style D fill:#B5EAD7,stroke:#80CBC4,color:#333
style E fill:#E8D5F5,stroke:#CE93D8,color:#333
style F fill:#FFDAB9,stroke:#FFAB76,color:#333
场景
推荐方案
原因
加速比
LLM API 调用(IO 阻塞)
asyncio + httpx/aiohttp
高并发、低内存
10-100x
多线程爬虫
ThreadPoolExecutor
IO 密集、GIL 释放
5-20x
大量数据预处理(CPU)
ProcessPoolExecutor
绕过 GIL、多核利用
Nx(N=核数)
混合任务(LLM + 预处理)
ProcessPoolExecutor + asyncio
分层解耦
取决于瓶颈
简单并发任务
ThreadPoolExecutor
简单直观
-
4.1 asyncio vs threading:不是替代,是互补
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
import asyncio import httpx
# asyncio:单线程内并发处理成千上万个 IO 操作 asyncdeffetch_all(urls: list[str]): asyncwith httpx.AsyncClient() as client: tasks = [client.get(url) for url in urls] responses = await asyncio.gather(*tasks) return responses