FastAPI通过Background Tasks提供了一种轻量级异步解决方案,允许开发者在请求响应后继续执行非即时性任务。

一、核心原理

1.1 底层机制解析

FastAPI的Background Tasks并非基于多线程或多进程,而是利用Python的asyncio事件循环实现异步执行。其核心实现位于starlette.background模块中,具体流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
# Starlette源码简析
class BackgroundTask:
def __init__(self, func: typing.Callable, *args: typing.Any, **kwargs: typing.Any) -> None:
self.func = func
self.args = args
self.kwargs = kwargs

async def __call__(self) -> None:
if iscoroutinefunction(self.func):
await self.func(*self.args, **self.kwargs)
else:
await run_in_threadpool(self.func, *self.args, **self.kwargs)

关键点:

  • 协程优先:自动检测任务是否为async函数
  • 线程池降级:同步函数通过anyio.to_thread.run_sync在独立线程执行
  • 生命周期绑定:任务与请求上下文共享生命周期

1.2 与Celery等队列系统的区别

特性 FastAPI Background Tasks Celery
任务持久化 ❌ 内存存储,重启丢失 ✅ 支持数据库/RabbitMQ
分布式处理 ❌ 单机执行 ✅ 支持集群
执行延迟 立即执行 可设置延迟
复杂度 ⭐️ ⭐️⭐️⭐️
适用场景 短时、非关键任务 长时、关键任务

二、开发实践

2.1 任务管理案例

场景:用户上传CSV文件后,后台解析数据并生成统计报告,完成后发送通知邮件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from fastapi import FastAPI, UploadFile, BackgroundTasks
from pydantic import EmailStr
import pandas as pd
import io
from sqlalchemy.orm import Session
from .database import SessionLocal # 确保从数据库模块导入SessionLocal
from .models import UserReport
from .email import send_notification
from .generate_stats import generate_statistics # 导入生成统计报告的函数

app = FastAPI()

async def process_report(
contents: bytes, # 接收文件内容而不是UploadFile对象
user_email: EmailStr,
):
db = SessionLocal() # 在后台任务中创建新的数据库会话
try:
# 解析CSV
df = pd.read_csv(io.BytesIO(contents))

# 生成报告
report_data = generate_statistics(df)

# 保存到数据库
db_report = UserReport(
email=user_email,
data=report_data.json() # 假设report_data有json()方法
)
db.add(db_report)
db.commit()

# 发送异步通知
await send_notification(
email=user_email,
message=f"Report {db_report.id} is ready"
)
except Exception as e:
db.rollback()
# 可根据需要添加错误处理逻辑
raise e
finally:
db.close() # 确保关闭数据库会话

@app.post("/upload-report/")
async def upload_report(
file: UploadFile,
email: EmailStr,
background_tasks: BackgroundTasks,
):
background_tasks.add_task(
process_report,
file,
email
)
return {"status": "processing_started"}

2.2 异常处理与日志监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from contextlib import suppress
from logging import getLogger

logger = getLogger(__name__)

async def safe_task_wrapper(func, *args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as e:
logger.error(f"Task failed: {str(e)}", exc_info=True)
with suppress(Exception):
await send_alert_email(f"Task Error: {type(e).__name__}")
finally:
logger.info("Task completed with cleanup")

# 使用装饰器增强任务
background_tasks.add_task(safe_task_wrapper, process_report, file, email)

三、性能优化策略

3.1 资源限制模式

通过信号量控制并发任务数:

1
2
3
4
5
6
7
8
9
10
11
from anyio import create_task_group, Semaphore

MAX_CONCURRENT_TASKS = 5
task_semaphore = Semaphore(MAX_CONCURRENT_TASKS)

async def rate_limited_task(func, *args, **kwargs):
async with task_semaphore:
return await func(*args, **kwargs)

# 添加任务时
background_tasks.add_task(rate_limited_task, heavy_processing, data)

3.2 任务优先级队列

实现加权优先级调度:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import heapq
from fastapi import FastAPI, UploadFile, BackgroundTasks
from typing import Optional, Callable, Any
import threading

app = FastAPI()

class PriorityBackgroundTasks(BackgroundTasks):
def __init__(self):
super().__init__()
self._priority_queue = []
self._lock = threading.Lock()

def add_task(
self,
func: Callable[..., Any],
*args: Any,
priority: int = 0, # 默认优先级为0,数值越小优先级越高
**kwargs: Any
) -> None:
with self._lock:
heapq.heappush(self._priority_queue, (priority, (func, args, kwargs)))

async def __call__(self) -> None:
while self._priority_queue:
with self._lock:
_, (func, args, kwargs) = heapq.heappop(self._priority_queue)
await self.run_func(func, *args, **kwargs)

async def run_func(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
if self.is_async(func):
await func(*args, **kwargs)
else:
await self.run_in_threadpool(func, *args, **kwargs)

# 使用示例
@app.post("/upload-report/")
async def upload_report(
file: UploadFile,
email: str,
background_tasks: PriorityBackgroundTasks = PriorityBackgroundTasks()
):
contents = await file.read()

# 添加高优先级任务(priority=0)
background_tasks.add_task(
process_urgent_report,
contents,
email,
priority=0
)

# 添加普通优先级任务(priority=1)
background_tasks.add_task(
process_report,
contents,
email,
priority=1
)

return {"status": "processing_started"}

# 示例任务函数
async def process_urgent_report(contents: bytes, email: str):
print("Processing urgent report...")
# 实现具体逻辑

async def process_report(contents: bytes, email: str):
print("Processing normal report...")
# 实现具体逻辑

四、测试与调试技巧

4.1 单元测试方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from fastapi.testclient import TestClient
from unittest.mock import patch

def test_background_task():
client = TestClient(app)
mock_send = AsyncMock()

with patch("module.send_notification", mock_send):
response = client.post("/upload-report/", ...)
assert response.status_code == 200

# 等待后台任务完成
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.sleep(0.1))

mock_send.assert_awaited_once()

4.2 实时监控实现

集成Prometheus监控:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from prometheus_client import Counter, Histogram

TASK_STARTED = Counter("bg_tasks_started", "Background tasks started")
TASK_DURATION = Histogram("bg_task_duration", "Task execution time")

async def monitored_task(func, *args, **kwargs):
TASK_STARTED.inc()
start_time = time.time()

try:
result = await func(*args, **kwargs)
duration = time.time() - start_time
TASK_DURATION.observe(duration)
return result
except Exception:
TASK_ERRORS.labels(type=type(e).__name__).inc()
raise