一个完整的FastAPI + Celery + Redis集成案例,包含配置、Docker部署和项目结构:
项目结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| fastapi-celery-redis/ ├── app/ │ ├── __init__.py │ ├── config/ │ │ ├── __init__.py │ │ └── settings.py │ ├── main.py │ ├── routers/ │ │ ├── __init__.py │ │ └── tasks.py │ └── tasks/ │ ├── __init__.py │ └── celery_app.py ├── docker-compose.yml ├── Dockerfile ├── requirements.txt └── start.sh
|
1. 配置文件 (app/config/settings.py)
1 2 3 4 5 6 7 8 9 10 11
| from pydantic import BaseSettings
class Settings(BaseSettings): redis_url: str = "redis://redis:6379/0" celery_broker_url: str = "redis://redis:6379/0" celery_result_backend: str = "redis://redis:6379/0" class Config: env_file = ".env"
settings = Settings()
|
2. Celery配置 (app/tasks/celery_app.py)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| from celery import Celery from app.config.settings import settings
celery_app = Celery( "worker", broker=settings.celery_broker_url, backend=settings.celery_result_backend, include=["app.tasks.celery_app"] )
celery_app.conf.update( task_serializer="json", accept_content=["json"], result_serializer="json", timezone="UTC", enable_utc=True, )
|
3. FastAPI主文件 (app/main.py)
1 2 3 4 5 6 7 8 9 10
| from fastapi import FastAPI from app.routers import tasks from app.config.settings import settings
app = FastAPI(title="FastAPI Celery Example") app.include_router(tasks.router)
@app.get("/health") def health_check(): return {"status": "healthy"}
|
4. 路由文件 (app/routers/tasks.py)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| from fastapi import APIRouter from app.tasks.celery_app import celery_app from celery.result import AsyncResult
router = APIRouter(prefix="/tasks", tags=["tasks"])
@router.post("/add") async def add_numbers(a: int, b: int): task = celery_app.send_task("add_numbers", args=[a, b]) return {"task_id": task.id}
@router.get("/result/{task_id}") async def get_task_result(task_id: str): task_result = AsyncResult(task_id, app=celery_app) return { "task_id": task_id, "status": task_result.status, "result": task_result.result }
|
5. Celery任务 (在app/tasks/celery_app.py中添加)
1 2 3 4 5
| @celery_app.task(name="add_numbers") def add_numbers(a: int, b: int): import time time.sleep(5) return a + b
|
6. Dockerfile
1 2 3 4 5 6 7 8 9 10
| FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["sh", "-c", "uvicorn app.main:app --host 0.0.0.0 --port 8000"]
|
7. docker-compose.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| version: '3.8'
services: redis: image: redis:7-alpine ports: - "6379:6379" volumes: - redis_data:/data networks: - app-network
web: build: . ports: - "8000:8000" environment: - REDIS_URL=redis://redis:6379/0 depends_on: - redis networks: - app-network
worker: build: . command: celery -A app.tasks.celery_app worker --loglevel=info environment: - REDIS_URL=redis://redis:6379/0 depends_on: - redis - web networks: - app-network
volumes: redis_data:
networks: app-network:
|
8. requirements.txt
1 2 3 4 5 6
| fastapi>=0.68.0 uvicorn>=0.15.0 celery>=5.2.0 redis>=4.3.0 pydantic>=1.8.0 python-dotenv>=0.19.0
|
9. 启动脚本 (start.sh)
1 2 3 4
| #!/bin/bash
docker-compose up --build
|
使用说明
构建并启动服务:
1 2
| chmod +x start.sh ./start.sh
|
测试API:
1 2 3 4 5
| curl -X POST "http://localhost:8000/tasks/add" -H "Content-Type: application/json" -d '{"a": 5, "b": 3}'
curl "http://localhost:8000/tasks/result/<task_id>"
|
这个完整的实现包含以下特点:
- 使用pydantic进行配置管理
- 模块化项目结构
- 完整的Docker部署方案
- Redis同时作为消息代理和结果存储
- 包含健康检查端点
- 支持异步任务状态查询
- 模拟5秒延迟展示异步处理效果
- 网络隔离和持久化存储
- 环境变量配置支持
可以根据实际需求调整:
- 任务类型和工作流
- Redis配置参数
- Celery worker数量和配置
- 日志格式和级别
- 超时和重试策略
- 安全中间件等