一个完整的FastAPI + Celery + Redis集成案例,包含配置、Docker部署和项目结构:

项目结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
fastapi-celery-redis/
├── app/
│ ├── __init__.py
│ ├── config/
│ │ ├── __init__.py
│ │ └── settings.py
│ ├── main.py
│ ├── routers/
│ │ ├── __init__.py
│ │ └── tasks.py
│ └── tasks/
│ ├── __init__.py
│ └── celery_app.py
├── docker-compose.yml
├── Dockerfile
├── requirements.txt
└── start.sh

1. 配置文件 (app/config/settings.py)

1
2
3
4
5
6
7
8
9
10
11
from pydantic import BaseSettings

class Settings(BaseSettings):
redis_url: str = "redis://redis:6379/0"
celery_broker_url: str = "redis://redis:6379/0"
celery_result_backend: str = "redis://redis:6379/0"

class Config:
env_file = ".env"

settings = Settings()

2. Celery配置 (app/tasks/celery_app.py)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from celery import Celery
from app.config.settings import settings

celery_app = Celery(
"worker",
broker=settings.celery_broker_url,
backend=settings.celery_result_backend,
include=["app.tasks.celery_app"]
)

celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
)

3. FastAPI主文件 (app/main.py)

1
2
3
4
5
6
7
8
9
10
from fastapi import FastAPI
from app.routers import tasks
from app.config.settings import settings

app = FastAPI(title="FastAPI Celery Example")
app.include_router(tasks.router)

@app.get("/health")
def health_check():
return {"status": "healthy"}

4. 路由文件 (app/routers/tasks.py)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from fastapi import APIRouter
from app.tasks.celery_app import celery_app
from celery.result import AsyncResult

router = APIRouter(prefix="/tasks", tags=["tasks"])

@router.post("/add")
async def add_numbers(a: int, b: int):
task = celery_app.send_task("add_numbers", args=[a, b])
return {"task_id": task.id}

@router.get("/result/{task_id}")
async def get_task_result(task_id: str):
task_result = AsyncResult(task_id, app=celery_app)
return {
"task_id": task_id,
"status": task_result.status,
"result": task_result.result
}

5. Celery任务 (在app/tasks/celery_app.py中添加)

1
2
3
4
5
@celery_app.task(name="add_numbers")
def add_numbers(a: int, b: int):
import time
time.sleep(5) # 模拟耗时操作
return a + b

6. Dockerfile

1
2
3
4
5
6
7
8
9
10
FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["sh", "-c", "uvicorn app.main:app --host 0.0.0.0 --port 8000"]

7. docker-compose.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
version: '3.8'

services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
networks:
- app-network

web:
build: .
ports:
- "8000:8000"
environment:
- REDIS_URL=redis://redis:6379/0
depends_on:
- redis
networks:
- app-network

worker:
build: .
command: celery -A app.tasks.celery_app worker --loglevel=info
environment:
- REDIS_URL=redis://redis:6379/0
depends_on:
- redis
- web
networks:
- app-network

volumes:
redis_data:

networks:
app-network:

8. requirements.txt

1
2
3
4
5
6
fastapi>=0.68.0
uvicorn>=0.15.0
celery>=5.2.0
redis>=4.3.0
pydantic>=1.8.0
python-dotenv>=0.19.0

9. 启动脚本 (start.sh)

1
2
3
4
#!/bin/bash

# 启动所有服务
docker-compose up --build

使用说明

  1. 构建并启动服务:

    1
    2
    chmod +x start.sh
    ./start.sh
  2. 测试API:

    1
    2
    3
    4
    5
    # 提交任务
    curl -X POST "http://localhost:8000/tasks/add" -H "Content-Type: application/json" -d '{"a": 5, "b": 3}'

    # 获取结果(替换为实际的task_id)
    curl "http://localhost:8000/tasks/result/<task_id>"

这个完整的实现包含以下特点:

  1. 使用pydantic进行配置管理
  2. 模块化项目结构
  3. 完整的Docker部署方案
  4. Redis同时作为消息代理和结果存储
  5. 包含健康检查端点
  6. 支持异步任务状态查询
  7. 模拟5秒延迟展示异步处理效果
  8. 网络隔离和持久化存储
  9. 环境变量配置支持

可以根据实际需求调整:

  • 任务类型和工作流
  • Redis配置参数
  • Celery worker数量和配置
  • 日志格式和级别
  • 超时和重试策略
  • 安全中间件等