51 lines
1.5 KiB
Python
51 lines
1.5 KiB
Python
from celery import Celery
|
|
from flask import current_app
|
|
import os
|
|
import logging
|
|
|
|
# Configure logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Get Redis URL from environment variable or use default
|
|
REDIS_URL = os.getenv('REDIS_URL', 'redis://localhost:6379/0')
|
|
|
|
# Configure Celery
|
|
celery = Celery(
|
|
'docupulse',
|
|
backend=REDIS_URL,
|
|
broker=REDIS_URL,
|
|
# Add some default configuration
|
|
task_serializer='json',
|
|
accept_content=['json'],
|
|
result_serializer='json',
|
|
timezone='UTC',
|
|
enable_utc=True,
|
|
# Add retry configuration
|
|
task_acks_late=True,
|
|
task_reject_on_worker_lost=True,
|
|
task_default_retry_delay=300, # 5 minutes
|
|
task_max_retries=3
|
|
)
|
|
|
|
def init_celery(app):
|
|
"""Initialize Celery with Flask app context"""
|
|
celery.conf.update(app.config)
|
|
|
|
class ContextTask(celery.Task):
|
|
"""Celery task that runs within Flask app context"""
|
|
def __call__(self, *args, **kwargs):
|
|
with app.app_context():
|
|
return self.run(*args, **kwargs)
|
|
|
|
def on_failure(self, exc, task_id, args, kwargs, einfo):
|
|
"""Handle task failure"""
|
|
logger.error(f'Task {task_id} failed: {exc}')
|
|
super().on_failure(exc, task_id, args, kwargs, einfo)
|
|
|
|
def on_retry(self, exc, task_id, args, kwargs, einfo):
|
|
"""Handle task retry"""
|
|
logger.warning(f'Task {task_id} is being retried: {exc}')
|
|
super().on_retry(exc, task_id, args, kwargs, einfo)
|
|
|
|
celery.Task = ContextTask
|
|
return celery |