sending email async with celery
This commit is contained in:
Binary file not shown.
BIN
__pycache__/celery_worker.cpython-313.pyc
Normal file
BIN
__pycache__/celery_worker.cpython-313.pyc
Normal file
Binary file not shown.
29
app.py
29
app.py
@@ -1,5 +1,5 @@
|
|||||||
import random
|
import random
|
||||||
from flask import Flask, send_from_directory
|
from flask import Flask, send_from_directory, jsonify
|
||||||
from flask_migrate import Migrate
|
from flask_migrate import Migrate
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
import os
|
import os
|
||||||
@@ -14,6 +14,7 @@ import click
|
|||||||
from utils import timeago
|
from utils import timeago
|
||||||
from extensions import db, login_manager, csrf
|
from extensions import db, login_manager, csrf
|
||||||
from utils.email_templates import create_default_templates
|
from utils.email_templates import create_default_templates
|
||||||
|
from celery_worker import init_celery, celery
|
||||||
|
|
||||||
# Load environment variables
|
# Load environment variables
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
@@ -35,6 +36,9 @@ def create_app():
|
|||||||
login_manager.login_view = 'auth.login'
|
login_manager.login_view = 'auth.login'
|
||||||
csrf.init_app(app)
|
csrf.init_app(app)
|
||||||
|
|
||||||
|
# Initialize Celery
|
||||||
|
init_celery(app)
|
||||||
|
|
||||||
@app.context_processor
|
@app.context_processor
|
||||||
def inject_csrf_token():
|
def inject_csrf_token():
|
||||||
return dict(csrf_token=generate_csrf())
|
return dict(csrf_token=generate_csrf())
|
||||||
@@ -48,13 +52,32 @@ def create_app():
|
|||||||
def load_user(user_id):
|
def load_user(user_id):
|
||||||
return User.query.get(int(user_id))
|
return User.query.get(int(user_id))
|
||||||
|
|
||||||
|
# Health check endpoint
|
||||||
|
@app.route('/health')
|
||||||
|
def health_check():
|
||||||
|
try:
|
||||||
|
# Check database connection
|
||||||
|
db.session.execute('SELECT 1')
|
||||||
|
# Check Redis connection
|
||||||
|
celery.control.inspect().ping()
|
||||||
|
return jsonify({
|
||||||
|
'status': 'healthy',
|
||||||
|
'database': 'connected',
|
||||||
|
'redis': 'connected'
|
||||||
|
}), 200
|
||||||
|
except Exception as e:
|
||||||
|
return jsonify({
|
||||||
|
'status': 'unhealthy',
|
||||||
|
'error': str(e)
|
||||||
|
}), 500
|
||||||
|
|
||||||
# Initialize routes
|
# Initialize routes
|
||||||
from routes import init_app
|
from routes import init_app
|
||||||
init_app(app)
|
init_app(app)
|
||||||
app.register_blueprint(room_files_bp, url_prefix='/api/rooms')
|
app.register_blueprint(room_files_bp, url_prefix='/api/rooms')
|
||||||
|
app.register_blueprint(user_bp, url_prefix='/api/users')
|
||||||
app.register_blueprint(room_members_bp, url_prefix='/api/rooms')
|
app.register_blueprint(room_members_bp, url_prefix='/api/rooms')
|
||||||
app.register_blueprint(trash_bp, url_prefix='/api/rooms')
|
app.register_blueprint(trash_bp, url_prefix='/api/trash')
|
||||||
app.register_blueprint(user_bp)
|
|
||||||
|
|
||||||
@app.cli.command("cleanup-trash")
|
@app.cli.command("cleanup-trash")
|
||||||
def cleanup_trash_command():
|
def cleanup_trash_command():
|
||||||
|
|||||||
51
celery_worker.py
Normal file
51
celery_worker.py
Normal file
@@ -0,0 +1,51 @@
|
|||||||
|
from celery import Celery
|
||||||
|
from flask import current_app
|
||||||
|
import os
|
||||||
|
import logging
|
||||||
|
|
||||||
|
# Configure logging
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Get Redis URL from environment variable or use default
|
||||||
|
REDIS_URL = os.getenv('REDIS_URL', 'redis://localhost:6379/0')
|
||||||
|
|
||||||
|
# Configure Celery
|
||||||
|
celery = Celery(
|
||||||
|
'docupulse',
|
||||||
|
backend=REDIS_URL,
|
||||||
|
broker=REDIS_URL,
|
||||||
|
# Add some default configuration
|
||||||
|
task_serializer='json',
|
||||||
|
accept_content=['json'],
|
||||||
|
result_serializer='json',
|
||||||
|
timezone='UTC',
|
||||||
|
enable_utc=True,
|
||||||
|
# Add retry configuration
|
||||||
|
task_acks_late=True,
|
||||||
|
task_reject_on_worker_lost=True,
|
||||||
|
task_default_retry_delay=300, # 5 minutes
|
||||||
|
task_max_retries=3
|
||||||
|
)
|
||||||
|
|
||||||
|
def init_celery(app):
|
||||||
|
"""Initialize Celery with Flask app context"""
|
||||||
|
celery.conf.update(app.config)
|
||||||
|
|
||||||
|
class ContextTask(celery.Task):
|
||||||
|
"""Celery task that runs within Flask app context"""
|
||||||
|
def __call__(self, *args, **kwargs):
|
||||||
|
with app.app_context():
|
||||||
|
return self.run(*args, **kwargs)
|
||||||
|
|
||||||
|
def on_failure(self, exc, task_id, args, kwargs, einfo):
|
||||||
|
"""Handle task failure"""
|
||||||
|
logger.error(f'Task {task_id} failed: {exc}')
|
||||||
|
super().on_failure(exc, task_id, args, kwargs, einfo)
|
||||||
|
|
||||||
|
def on_retry(self, exc, task_id, args, kwargs, einfo):
|
||||||
|
"""Handle task retry"""
|
||||||
|
logger.warning(f'Task {task_id} is being retried: {exc}')
|
||||||
|
super().on_retry(exc, task_id, args, kwargs, einfo)
|
||||||
|
|
||||||
|
celery.Task = ContextTask
|
||||||
|
return celery
|
||||||
@@ -12,11 +12,18 @@ services:
|
|||||||
- POSTGRES_USER=postgres
|
- POSTGRES_USER=postgres
|
||||||
- POSTGRES_PASSWORD=postgres
|
- POSTGRES_PASSWORD=postgres
|
||||||
- POSTGRES_DB=docupulse
|
- POSTGRES_DB=docupulse
|
||||||
|
- REDIS_URL=redis://redis:6379/0
|
||||||
volumes:
|
volumes:
|
||||||
- uploads:/app/uploads
|
- uploads:/app/uploads
|
||||||
depends_on:
|
depends_on:
|
||||||
- db
|
- db
|
||||||
|
- redis
|
||||||
restart: unless-stopped
|
restart: unless-stopped
|
||||||
|
healthcheck:
|
||||||
|
test: ["CMD", "curl", "-f", "http://localhost:5000/health"]
|
||||||
|
interval: 30s
|
||||||
|
timeout: 10s
|
||||||
|
retries: 3
|
||||||
deploy:
|
deploy:
|
||||||
resources:
|
resources:
|
||||||
limits:
|
limits:
|
||||||
@@ -32,6 +39,48 @@ services:
|
|||||||
volumes:
|
volumes:
|
||||||
- postgres_data:/var/lib/postgresql/data
|
- postgres_data:/var/lib/postgresql/data
|
||||||
restart: unless-stopped
|
restart: unless-stopped
|
||||||
|
healthcheck:
|
||||||
|
test: ["CMD-SHELL", "pg_isready -U postgres"]
|
||||||
|
interval: 30s
|
||||||
|
timeout: 10s
|
||||||
|
retries: 3
|
||||||
|
|
||||||
|
redis:
|
||||||
|
image: redis:7
|
||||||
|
ports:
|
||||||
|
- "6379:6379"
|
||||||
|
restart: unless-stopped
|
||||||
|
healthcheck:
|
||||||
|
test: ["CMD", "redis-cli", "ping"]
|
||||||
|
interval: 30s
|
||||||
|
timeout: 10s
|
||||||
|
retries: 3
|
||||||
|
|
||||||
|
celery_worker:
|
||||||
|
build: .
|
||||||
|
command: celery -A celery_worker.celery worker --loglevel=info
|
||||||
|
environment:
|
||||||
|
- FLASK_APP=app.py
|
||||||
|
- FLASK_ENV=production
|
||||||
|
- DATABASE_URL=postgresql://postgres:postgres@db:5432/docupulse
|
||||||
|
- REDIS_URL=redis://redis:6379/0
|
||||||
|
volumes:
|
||||||
|
- .:/app
|
||||||
|
depends_on:
|
||||||
|
- web
|
||||||
|
- redis
|
||||||
|
- db
|
||||||
|
restart: unless-stopped
|
||||||
|
healthcheck:
|
||||||
|
test: ["CMD", "celery", "-A", "celery_worker.celery", "inspect", "ping"]
|
||||||
|
interval: 30s
|
||||||
|
timeout: 10s
|
||||||
|
retries: 3
|
||||||
|
deploy:
|
||||||
|
resources:
|
||||||
|
limits:
|
||||||
|
cpus: '0.5'
|
||||||
|
memory: 512M
|
||||||
|
|
||||||
volumes:
|
volumes:
|
||||||
postgres_data:
|
postgres_data:
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ echo "POSTGRES_USER: $POSTGRES_USER"
|
|||||||
echo "POSTGRES_PASSWORD: $POSTGRES_PASSWORD"
|
echo "POSTGRES_PASSWORD: $POSTGRES_PASSWORD"
|
||||||
echo "POSTGRES_DB: $POSTGRES_DB"
|
echo "POSTGRES_DB: $POSTGRES_DB"
|
||||||
echo "DATABASE_URL: $DATABASE_URL"
|
echo "DATABASE_URL: $DATABASE_URL"
|
||||||
|
echo "REDIS_URL: $REDIS_URL"
|
||||||
|
|
||||||
# Wait for the database to be ready
|
# Wait for the database to be ready
|
||||||
echo "Waiting for database to be ready..."
|
echo "Waiting for database to be ready..."
|
||||||
@@ -14,6 +15,13 @@ while ! nc -z db 5432; do
|
|||||||
done
|
done
|
||||||
echo "Database is ready!"
|
echo "Database is ready!"
|
||||||
|
|
||||||
|
# Wait for Redis to be ready
|
||||||
|
echo "Waiting for Redis to be ready..."
|
||||||
|
while ! nc -z redis 6379; do
|
||||||
|
sleep 0.1
|
||||||
|
done
|
||||||
|
echo "Redis is ready!"
|
||||||
|
|
||||||
# Wait for PostgreSQL to be ready to accept connections
|
# Wait for PostgreSQL to be ready to accept connections
|
||||||
echo "Waiting for PostgreSQL to accept connections..."
|
echo "Waiting for PostgreSQL to accept connections..."
|
||||||
until PGPASSWORD=$POSTGRES_PASSWORD psql -h db -U $POSTGRES_USER -d $POSTGRES_DB -c '\q'; do
|
until PGPASSWORD=$POSTGRES_PASSWORD psql -h db -U $POSTGRES_USER -d $POSTGRES_DB -c '\q'; do
|
||||||
|
|||||||
@@ -1,12 +1,17 @@
|
|||||||
Flask==3.0.2
|
Flask>=2.0.0
|
||||||
Flask-SQLAlchemy==3.1.1
|
Flask-SQLAlchemy>=3.0.0
|
||||||
Flask-Login==0.6.3
|
Flask-Login>=0.6.0
|
||||||
Flask-WTF==1.2.1
|
Flask-WTF>=1.0.0
|
||||||
Flask-Migrate==4.0.5
|
Flask-Migrate>=4.0.0
|
||||||
SQLAlchemy==2.0.23
|
SQLAlchemy>=1.4.0
|
||||||
Werkzeug==3.0.1
|
Werkzeug>=2.0.0
|
||||||
WTForms==3.1.1
|
WTForms==3.1.1
|
||||||
python-dotenv==1.0.1
|
python-dotenv>=0.19.0
|
||||||
psycopg2-binary==2.9.9
|
psycopg2-binary==2.9.9
|
||||||
gunicorn==21.2.0
|
gunicorn==21.2.0
|
||||||
email_validator==2.1.0.post1
|
email_validator==2.1.0.post1
|
||||||
|
celery>=5.3.0
|
||||||
|
redis>=4.5.0
|
||||||
|
alembic>=1.7.0
|
||||||
|
flower>=2.0.0
|
||||||
|
prometheus-client>=0.16.0
|
||||||
Binary file not shown.
@@ -1,14 +1,16 @@
|
|||||||
from flask import request
|
from flask import request
|
||||||
from models import Notif, NotifType, db, EmailTemplate, Mail, KeyValueSettings
|
from models import Notif, NotifType, db, EmailTemplate, Mail, KeyValueSettings, User
|
||||||
from typing import Optional, Dict, Any, List
|
from typing import Optional, Dict, Any, List
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from flask_login import current_user
|
from flask_login import current_user
|
||||||
from sqlalchemy import desc
|
from sqlalchemy import desc, and_
|
||||||
import logging
|
import logging
|
||||||
import smtplib
|
import smtplib
|
||||||
from email.mime.text import MIMEText
|
from email.mime.text import MIMEText
|
||||||
from email.mime.multipart import MIMEMultipart
|
from email.mime.multipart import MIMEMultipart
|
||||||
|
from email.utils import formatdate
|
||||||
import json
|
import json
|
||||||
|
from celery_worker import celery
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -29,57 +31,65 @@ def get_smtp_settings() -> Optional[Dict[str, Any]]:
|
|||||||
logger.error(f"Error retrieving SMTP settings: {str(e)}")
|
logger.error(f"Error retrieving SMTP settings: {str(e)}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def send_email_via_smtp(mail: Mail) -> bool:
|
@celery.task
|
||||||
"""
|
def send_email_task(mail_id: int):
|
||||||
Send an email using the configured SMTP settings.
|
"""Celery task to send an email asynchronously"""
|
||||||
|
try:
|
||||||
Args:
|
# Get the mail record
|
||||||
mail: The Mail object containing the email details
|
mail = Mail.query.get(mail_id)
|
||||||
|
if not mail:
|
||||||
Returns:
|
logger.error(f"Mail record not found for ID: {mail_id}")
|
||||||
bool: True if email was sent successfully, False otherwise
|
return False
|
||||||
"""
|
|
||||||
smtp_settings = get_smtp_settings()
|
# Get SMTP settings
|
||||||
if not smtp_settings:
|
smtp_settings = get_smtp_settings()
|
||||||
logger.error("Cannot send email: SMTP settings not configured")
|
if not smtp_settings:
|
||||||
|
logger.error("SMTP settings not found")
|
||||||
|
mail.status = 'failed'
|
||||||
|
mail.error_message = "SMTP settings not found"
|
||||||
|
db.session.commit()
|
||||||
return False
|
return False
|
||||||
|
|
||||||
try:
|
|
||||||
# Create message
|
# Create message
|
||||||
msg = MIMEMultipart()
|
msg = MIMEMultipart()
|
||||||
msg['From'] = f"{smtp_settings['smtp_from_name']} <{smtp_settings['smtp_from_email']}>"
|
msg['From'] = smtp_settings.sender_email
|
||||||
msg['To'] = mail.recipient
|
msg['To'] = mail.recipient_email
|
||||||
msg['Subject'] = mail.subject
|
msg['Subject'] = mail.subject
|
||||||
|
msg['Date'] = formatdate(localtime=True)
|
||||||
|
|
||||||
# Attach HTML body
|
# Add HTML content
|
||||||
msg.attach(MIMEText(mail.body, 'html'))
|
msg.attach(MIMEText(mail.content, 'html'))
|
||||||
|
|
||||||
# Create SMTP connection
|
|
||||||
if smtp_settings['smtp_security'] == 'ssl':
|
|
||||||
server = smtplib.SMTP_SSL(smtp_settings['smtp_host'], int(smtp_settings['smtp_port']))
|
|
||||||
else:
|
|
||||||
server = smtplib.SMTP(smtp_settings['smtp_host'], int(smtp_settings['smtp_port']))
|
|
||||||
if smtp_settings['smtp_security'] == 'tls':
|
|
||||||
server.starttls()
|
|
||||||
|
|
||||||
# Login if credentials are provided
|
|
||||||
if smtp_settings['smtp_username'] and smtp_settings['smtp_password']:
|
|
||||||
server.login(smtp_settings['smtp_username'], smtp_settings['smtp_password'])
|
|
||||||
|
|
||||||
# Send email
|
# Send email
|
||||||
|
with smtplib.SMTP(smtp_settings.smtp_server, smtp_settings.smtp_port) as server:
|
||||||
|
if smtp_settings.use_tls:
|
||||||
|
server.starttls()
|
||||||
|
if smtp_settings.username and smtp_settings.password:
|
||||||
|
server.login(smtp_settings.username, smtp_settings.password)
|
||||||
server.send_message(msg)
|
server.send_message(msg)
|
||||||
server.quit()
|
|
||||||
|
|
||||||
# Update mail status
|
# Update mail status
|
||||||
mail.status = 'sent'
|
mail.status = 'sent'
|
||||||
mail.sent_at = datetime.utcnow()
|
mail.sent_at = datetime.utcnow()
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
logger.info(f"Email sent successfully to {mail.recipient}")
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error sending email: {str(e)}")
|
logger.error(f"Error sending email: {str(e)}")
|
||||||
|
if mail:
|
||||||
|
mail.status = 'failed'
|
||||||
|
mail.error_message = str(e)
|
||||||
|
db.session.commit()
|
||||||
|
return False
|
||||||
|
|
||||||
|
def send_email_via_smtp(mail: Mail) -> bool:
|
||||||
|
"""Queue an email to be sent asynchronously"""
|
||||||
|
try:
|
||||||
|
# Queue the email sending task
|
||||||
|
send_email_task.delay(mail.id)
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error queueing email: {str(e)}")
|
||||||
mail.status = 'failed'
|
mail.status = 'failed'
|
||||||
mail.error_message = str(e)
|
mail.error_message = str(e)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|||||||
Reference in New Issue
Block a user