一、核心架构解析
1.1 Celery组件架构
1 2 3 4
| graph LR A[FastAPI] -->|AMQP| B[Message Broker] B --> C[Celery Worker] C --> D[Result Backend]
|
核心组件:
- Producer:FastAPI应用提交任务
- Broker:消息传输中间件(RabbitMQ/Redis)
- Worker:任务执行节点集群
- Result Backend:任务状态存储(Redis/PostgreSQL)
1.2 任务生命周期管理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @app.post("/tasks") async def create_task(data: TaskData): task = process_data.delay(data.dict()) return {"task_id": task.id}
@celery.task(bind=True, max_retries=3) def process_data(self, data): try: result = heavy_computation(data) update_database(result) except DatabaseError as exc: self.retry(exc=exc)
|
二、生产级集成方案
2.1 项目结构规范
1 2 3 4 5 6 7 8 9 10 11
| project/ ├── app/ │ ├── core/ │ │ └── celery.py │ ├── tasks/ │ │ ├── email.py │ │ └── report.py │ └── api/ │ └── tasks.py └── worker/ └── entrypoint.sh
|
2.2 Celery工厂模式实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| from celery import Celery from app.config import settings
def create_celery(): celery = Celery( __name__, broker=settings.CELERY_BROKER_URL, backend=settings.CELERY_RESULT_BACKEND, include=["app.tasks.email", "app.tasks.report"] ) celery.conf.update({ 'task_serializer': 'json', 'result_serializer': 'json', 'accept_content': ['json'], 'timezone': 'Asia/Shanghai', 'enable_utc': True, 'task_track_started': True, 'task_acks_late': True, 'broker_connection_retry_on_startup': True }) return celery
celery = create_celery()
|
三、复杂任务处理案例
3.1 订单支付异步处理系统
业务需求:
- 接收支付回调后启动库存锁定
- 生成电子发票
- 发送订单确认邮件
- 失败时执行补偿操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @celery.task(bind=True, acks_late=True, retry_backoff=True) def process_order_payment(self, order_id: str): try: order = Order.get(order_id) lock_inventory(order) self.update_state(state='INVENTORY_LOCKED') invoice = generate_invoice(order) self.update_state(state='INVOICE_GENERATED') send_confirmation_email(order.user_email, invoice) return {"status": "completed"} except InventoryLockError as e: self.retry(exc=e, max_retries=3) except InvoiceGenerationError: unlock_inventory(order) raise
|
API端点:
1 2 3 4 5 6 7 8
| @app.post("/orders/{order_id}/process") async def trigger_order_processing( order_id: UUID, background_tasks: BackgroundTasks ): task = process_order_payment.apply_async(args=[str(order_id)]) return {"task_id": task.id}
|
四、高级配置技巧
4.1 任务路由与队列划分
1 2 3 4 5 6 7 8 9
| celery.conf.task_routes = { 'app.tasks.email.*': {'queue': 'email'}, 'app.tasks.report.*': {'queue': 'report'}, 'app.tasks.order.*': {'queue': 'critical'} }
celery -A app.core.celery worker -Q critical -c 4 -n critical_worker.%h
|
4.2 任务结果优化策略
Redis存储结构优化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| from kombu.serialization import register
def custom_serializer(obj): ...
register('custom_serializer', custom_serializer, json.dumps, content_type='application/x-custom', content_encoding='utf-8')
celery.conf.result_serializer = 'custom_serializer'
|
五、监控与调试体系
5.1 分布式追踪集成
1 2 3 4 5 6 7 8 9 10 11
| from opentelemetry.instrumentation.celery import CeleryInstrumentor
CeleryInstrumentor().instrument()
@celery.task def tracked_task(): tracer = trace.get_tracer(__name__) with tracer.start_as_current_span("custom_operation"): perform_operation()
|
5.2 异常智能处理
1 2 3 4 5 6 7 8 9 10 11 12
| class TaskFailureHandler: @classmethod def handle_retryable(cls, exc): if isinstance(exc, (NetworkError, DatabaseConnectionError)): return self.retry(exc=exc) elif isinstance(exc, BusinessValidationError): log_error(exc) return {"status": "invalid"} else: notify_critical_failure(exc) raise
|
六、性能优化实践
6.1 预加载优化
Worker启动配置:
1 2
| celery.conf.worker_prefetch_multiplier = 4 celery.conf.worker_max_tasks_per_child = 100
|
6.2 动态扩缩容方案
Kubernetes HPA配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| apiVersion: autoscaling/v2 kind: HorizontalPodAutscaler metadata: name: celery-worker spec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: celery-worker minReplicas: 2 maxReplicas: 10 metrics: - type: Pods pods: metric: name: celery_queue_length target: type: AverageValue averageValue: 100
|
七、安全加固措施
7.1 任务签名验证
1 2 3 4 5 6 7 8 9 10 11
| from celery.security import setup_security
setup_security( keyring={'CELERY_SECURITY_KEY': 'django.utils.crypto.get_random_string(50)'}, serializer='authjson', digest='sha256' )
@celery.task(security=True) def sensitive_operation(data):
|
7.2 传输加密配置
1 2 3 4 5 6 7
| celery.conf.broker_use_ssl = { 'ca_certs': '/path/to/ca.pem', 'certfile': '/path/to/client.crt', 'keyfile': '/path/to/client.key', 'cert_reqs': ssl.CERT_REQUIRED }
|
通过将FastAPI与Celery深度集成,我们可以构建出符合以下特性的现代分布式系统:
- 弹性伸缩:根据负载动态调整Worker数量
- 故障隔离:不同业务队列独立处理
- 可观测性:完整的监控追踪体系
- 企业级安全:端到端的安全防护
实际部署时需注意:
- 根据业务特点选择合适消息中间件(推荐RabbitMQ用于金融级可靠性,Redis用于高性能场景)
- 建立完善的死信队列处理机制
- 定期进行压力测试和故障演练