diff --git a/__pycache__/app.cpython-313.pyc b/__pycache__/app.cpython-313.pyc index 5b5c186..22801ae 100644 Binary files a/__pycache__/app.cpython-313.pyc and b/__pycache__/app.cpython-313.pyc differ diff --git a/__pycache__/celery_worker.cpython-313.pyc b/__pycache__/celery_worker.cpython-313.pyc new file mode 100644 index 0000000..6e95e30 Binary files /dev/null and b/__pycache__/celery_worker.cpython-313.pyc differ diff --git a/app.py b/app.py index 14980b1..dd0fa74 100644 --- a/app.py +++ b/app.py @@ -1,5 +1,5 @@ import random -from flask import Flask, send_from_directory +from flask import Flask, send_from_directory, jsonify from flask_migrate import Migrate from dotenv import load_dotenv import os @@ -14,6 +14,7 @@ import click from utils import timeago from extensions import db, login_manager, csrf from utils.email_templates import create_default_templates +from celery_worker import init_celery, celery # Load environment variables load_dotenv() @@ -35,6 +36,9 @@ def create_app(): login_manager.login_view = 'auth.login' csrf.init_app(app) + # Initialize Celery + init_celery(app) + @app.context_processor def inject_csrf_token(): return dict(csrf_token=generate_csrf()) @@ -48,13 +52,32 @@ def create_app(): def load_user(user_id): return User.query.get(int(user_id)) + # Health check endpoint + @app.route('/health') + def health_check(): + try: + # Check database connection + db.session.execute('SELECT 1') + # Check Redis connection + celery.control.inspect().ping() + return jsonify({ + 'status': 'healthy', + 'database': 'connected', + 'redis': 'connected' + }), 200 + except Exception as e: + return jsonify({ + 'status': 'unhealthy', + 'error': str(e) + }), 500 + # Initialize routes from routes import init_app init_app(app) app.register_blueprint(room_files_bp, url_prefix='/api/rooms') + app.register_blueprint(user_bp, url_prefix='/api/users') app.register_blueprint(room_members_bp, url_prefix='/api/rooms') - app.register_blueprint(trash_bp, url_prefix='/api/rooms') - app.register_blueprint(user_bp) + app.register_blueprint(trash_bp, url_prefix='/api/trash') @app.cli.command("cleanup-trash") def cleanup_trash_command(): diff --git a/celery_worker.py b/celery_worker.py new file mode 100644 index 0000000..9baafef --- /dev/null +++ b/celery_worker.py @@ -0,0 +1,51 @@ +from celery import Celery +from flask import current_app +import os +import logging + +# Configure logging +logger = logging.getLogger(__name__) + +# Get Redis URL from environment variable or use default +REDIS_URL = os.getenv('REDIS_URL', 'redis://localhost:6379/0') + +# Configure Celery +celery = Celery( + 'docupulse', + backend=REDIS_URL, + broker=REDIS_URL, + # Add some default configuration + task_serializer='json', + accept_content=['json'], + result_serializer='json', + timezone='UTC', + enable_utc=True, + # Add retry configuration + task_acks_late=True, + task_reject_on_worker_lost=True, + task_default_retry_delay=300, # 5 minutes + task_max_retries=3 +) + +def init_celery(app): + """Initialize Celery with Flask app context""" + celery.conf.update(app.config) + + class ContextTask(celery.Task): + """Celery task that runs within Flask app context""" + def __call__(self, *args, **kwargs): + with app.app_context(): + return self.run(*args, **kwargs) + + def on_failure(self, exc, task_id, args, kwargs, einfo): + """Handle task failure""" + logger.error(f'Task {task_id} failed: {exc}') + super().on_failure(exc, task_id, args, kwargs, einfo) + + def on_retry(self, exc, task_id, args, kwargs, einfo): + """Handle task retry""" + logger.warning(f'Task {task_id} is being retried: {exc}') + super().on_retry(exc, task_id, args, kwargs, einfo) + + celery.Task = ContextTask + return celery \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index b463db0..d01c748 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -12,11 +12,18 @@ services: - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres - POSTGRES_DB=docupulse + - REDIS_URL=redis://redis:6379/0 volumes: - uploads:/app/uploads depends_on: - db + - redis restart: unless-stopped + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:5000/health"] + interval: 30s + timeout: 10s + retries: 3 deploy: resources: limits: @@ -32,6 +39,48 @@ services: volumes: - postgres_data:/var/lib/postgresql/data restart: unless-stopped + healthcheck: + test: ["CMD-SHELL", "pg_isready -U postgres"] + interval: 30s + timeout: 10s + retries: 3 + + redis: + image: redis:7 + ports: + - "6379:6379" + restart: unless-stopped + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 30s + timeout: 10s + retries: 3 + + celery_worker: + build: . + command: celery -A celery_worker.celery worker --loglevel=info + environment: + - FLASK_APP=app.py + - FLASK_ENV=production + - DATABASE_URL=postgresql://postgres:postgres@db:5432/docupulse + - REDIS_URL=redis://redis:6379/0 + volumes: + - .:/app + depends_on: + - web + - redis + - db + restart: unless-stopped + healthcheck: + test: ["CMD", "celery", "-A", "celery_worker.celery", "inspect", "ping"] + interval: 30s + timeout: 10s + retries: 3 + deploy: + resources: + limits: + cpus: '0.5' + memory: 512M volumes: postgres_data: diff --git a/entrypoint.sh b/entrypoint.sh index 2e6b99b..83aaf1e 100644 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -6,6 +6,7 @@ echo "POSTGRES_USER: $POSTGRES_USER" echo "POSTGRES_PASSWORD: $POSTGRES_PASSWORD" echo "POSTGRES_DB: $POSTGRES_DB" echo "DATABASE_URL: $DATABASE_URL" +echo "REDIS_URL: $REDIS_URL" # Wait for the database to be ready echo "Waiting for database to be ready..." @@ -14,6 +15,13 @@ while ! nc -z db 5432; do done echo "Database is ready!" +# Wait for Redis to be ready +echo "Waiting for Redis to be ready..." +while ! nc -z redis 6379; do + sleep 0.1 +done +echo "Redis is ready!" + # Wait for PostgreSQL to be ready to accept connections echo "Waiting for PostgreSQL to accept connections..." until PGPASSWORD=$POSTGRES_PASSWORD psql -h db -U $POSTGRES_USER -d $POSTGRES_DB -c '\q'; do diff --git a/requirements.txt b/requirements.txt index e5572b5..63bbdd7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,12 +1,17 @@ -Flask==3.0.2 -Flask-SQLAlchemy==3.1.1 -Flask-Login==0.6.3 -Flask-WTF==1.2.1 -Flask-Migrate==4.0.5 -SQLAlchemy==2.0.23 -Werkzeug==3.0.1 +Flask>=2.0.0 +Flask-SQLAlchemy>=3.0.0 +Flask-Login>=0.6.0 +Flask-WTF>=1.0.0 +Flask-Migrate>=4.0.0 +SQLAlchemy>=1.4.0 +Werkzeug>=2.0.0 WTForms==3.1.1 -python-dotenv==1.0.1 +python-dotenv>=0.19.0 psycopg2-binary==2.9.9 gunicorn==21.2.0 -email_validator==2.1.0.post1 \ No newline at end of file +email_validator==2.1.0.post1 +celery>=5.3.0 +redis>=4.5.0 +alembic>=1.7.0 +flower>=2.0.0 +prometheus-client>=0.16.0 \ No newline at end of file diff --git a/utils/__pycache__/notification.cpython-313.pyc b/utils/__pycache__/notification.cpython-313.pyc index b711e82..c6b6d5d 100644 Binary files a/utils/__pycache__/notification.cpython-313.pyc and b/utils/__pycache__/notification.cpython-313.pyc differ diff --git a/utils/notification.py b/utils/notification.py index aa41a9d..20e7846 100644 --- a/utils/notification.py +++ b/utils/notification.py @@ -1,14 +1,16 @@ from flask import request -from models import Notif, NotifType, db, EmailTemplate, Mail, KeyValueSettings +from models import Notif, NotifType, db, EmailTemplate, Mail, KeyValueSettings, User from typing import Optional, Dict, Any, List from datetime import datetime, timedelta from flask_login import current_user -from sqlalchemy import desc +from sqlalchemy import desc, and_ import logging import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart +from email.utils import formatdate import json +from celery_worker import celery logger = logging.getLogger(__name__) @@ -29,57 +31,65 @@ def get_smtp_settings() -> Optional[Dict[str, Any]]: logger.error(f"Error retrieving SMTP settings: {str(e)}") return None -def send_email_via_smtp(mail: Mail) -> bool: - """ - Send an email using the configured SMTP settings. - - Args: - mail: The Mail object containing the email details - - Returns: - bool: True if email was sent successfully, False otherwise - """ - smtp_settings = get_smtp_settings() - if not smtp_settings: - logger.error("Cannot send email: SMTP settings not configured") - return False - +@celery.task +def send_email_task(mail_id: int): + """Celery task to send an email asynchronously""" try: + # Get the mail record + mail = Mail.query.get(mail_id) + if not mail: + logger.error(f"Mail record not found for ID: {mail_id}") + return False + + # Get SMTP settings + smtp_settings = get_smtp_settings() + if not smtp_settings: + logger.error("SMTP settings not found") + mail.status = 'failed' + mail.error_message = "SMTP settings not found" + db.session.commit() + return False + # Create message msg = MIMEMultipart() - msg['From'] = f"{smtp_settings['smtp_from_name']} <{smtp_settings['smtp_from_email']}>" - msg['To'] = mail.recipient + msg['From'] = smtp_settings.sender_email + msg['To'] = mail.recipient_email msg['Subject'] = mail.subject + msg['Date'] = formatdate(localtime=True) - # Attach HTML body - msg.attach(MIMEText(mail.body, 'html')) - - # Create SMTP connection - if smtp_settings['smtp_security'] == 'ssl': - server = smtplib.SMTP_SSL(smtp_settings['smtp_host'], int(smtp_settings['smtp_port'])) - else: - server = smtplib.SMTP(smtp_settings['smtp_host'], int(smtp_settings['smtp_port'])) - if smtp_settings['smtp_security'] == 'tls': - server.starttls() - - # Login if credentials are provided - if smtp_settings['smtp_username'] and smtp_settings['smtp_password']: - server.login(smtp_settings['smtp_username'], smtp_settings['smtp_password']) + # Add HTML content + msg.attach(MIMEText(mail.content, 'html')) # Send email - server.send_message(msg) - server.quit() + with smtplib.SMTP(smtp_settings.smtp_server, smtp_settings.smtp_port) as server: + if smtp_settings.use_tls: + server.starttls() + if smtp_settings.username and smtp_settings.password: + server.login(smtp_settings.username, smtp_settings.password) + server.send_message(msg) # Update mail status mail.status = 'sent' mail.sent_at = datetime.utcnow() db.session.commit() - - logger.info(f"Email sent successfully to {mail.recipient}") return True except Exception as e: logger.error(f"Error sending email: {str(e)}") + if mail: + mail.status = 'failed' + mail.error_message = str(e) + db.session.commit() + return False + +def send_email_via_smtp(mail: Mail) -> bool: + """Queue an email to be sent asynchronously""" + try: + # Queue the email sending task + send_email_task.delay(mail.id) + return True + except Exception as e: + logger.error(f"Error queueing email: {str(e)}") mail.status = 'failed' mail.error_message = str(e) db.session.commit()