from flask import render_template, Blueprint, redirect, url_for, request, flash, Response, jsonify, session, current_app from flask_login import current_user, login_required from models import User, db, Room, RoomFile, RoomMemberPermission, SiteSettings, Event, Conversation, Message, MessageAttachment, Notif, EmailTemplate, Mail, KeyValueSettings, DocuPulseSettings, PasswordSetupToken, Instance, ManagementAPIKey from routes.auth import require_password_change import os from werkzeug.utils import secure_filename from sqlalchemy import func, case, literal_column, text from datetime import datetime, timedelta import logging import sys import time from forms import CompanySettingsForm from utils import log_event, create_notification, get_unread_count from io import StringIO import csv from flask_wtf.csrf import generate_csrf import json import smtplib import requests from functools import wraps import socket # Set up logging to show in console logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout) ] ) logger = logging.getLogger(__name__) def init_routes(main_bp): @main_bp.context_processor def inject_site_settings(): site_settings = SiteSettings.query.first() return dict(site_settings=site_settings) @main_bp.context_processor def inject_unread_notifications(): if current_user.is_authenticated: unread_count = get_unread_count(current_user.id) return {'unread_notifications': unread_count} return {'unread_notifications': 0} @main_bp.route('/') def public_home(): """Public homepage for the master instance - client-facing website""" # Check if this is a master instance is_master = os.environ.get('MASTER', 'false').lower() == 'true' if is_master: # For master instance, show the public homepage return render_template('home.html') else: # For child instances, redirect to login if not authenticated, otherwise dashboard if current_user.is_authenticated: return redirect(url_for('main.dashboard')) else: return redirect(url_for('auth.login')) @main_bp.route('/dashboard') @login_required @require_password_change def dashboard(): logger.info("Loading dashboard...") # Get 3 most recent users recent_contacts = User.query.order_by(User.created_at.desc()).limit(3).all() # Count active and inactive users active_count = User.query.filter_by(is_active=True).count() inactive_count = User.query.filter_by(is_active=False).count() # Get recent notifications recent_notifications = Notif.query.filter_by(user_id=current_user.id).order_by(Notif.timestamp.desc()).limit(5).all() # Get recent events (last 7) if current_user.is_admin: recent_events = Event.query.order_by(Event.timestamp.desc()).limit(7).all() else: # Get events where user is the actor OR events from conversations they are a member of recent_events = Event.query.filter( db.or_( Event.user_id == current_user.id, # User's own actions db.and_( Event.event_type.in_(['conversation_create', 'message_create']), # Conversation-related events db.cast(text("(details->>'conversation_id')::integer"), db.Integer).in_( db.session.query(Conversation.id) .join(Conversation.members) .filter(User.id == current_user.id) ) ) ) ).order_by(Event.timestamp.desc()).limit(7).all() # Get usage stats usage_stats = DocuPulseSettings.get_usage_stats() # Room count and size logic if current_user.is_admin: logger.info("Loading admin dashboard...") room_count = Room.query.count() # Get total file and folder counts for admin file_count = RoomFile.query.filter_by(type='file').count() folder_count = RoomFile.query.filter_by(type='folder').count() logger.info(f"Admin stats - Files: {file_count}, Folders: {folder_count}") # Get total size of all files including trash total_size = db.session.query(func.sum(RoomFile.size)).filter(RoomFile.type == 'file').scalar() or 0 # Get recent activity for all files recent_activity = db.session.query( RoomFile, Room, User ).join( Room, RoomFile.room_id == Room.id ).join( User, RoomFile.uploaded_by == User.id ).filter( RoomFile.deleted == False, RoomFile.uploaded_at.isnot(None) ).order_by( RoomFile.uploaded_at.desc() ).limit(10).all() logger.info(f"Recent activity query results: {len(recent_activity)}") if len(recent_activity) == 0: # Debug query to see what files exist all_files = RoomFile.query.filter_by(deleted=False).all() logger.info(f"Total non-deleted files: {len(all_files)}") for file in all_files[:5]: # Log first 5 files for debugging logger.info(f"File: {file.name}, Uploaded: {file.uploaded_at}, Type: {file.type}") # Format the activity data formatted_activity = [] for file, room, user in recent_activity: activity = { 'name': file.name, 'type': file.type, 'room': room, 'uploader': user, 'uploaded_at': file.uploaded_at, 'is_starred': current_user in file.starred_by, 'is_deleted': file.deleted, 'can_download': True # Admin can download everything } formatted_activity.append(activity) formatted_activities = formatted_activity logger.info(f"Formatted activities: {len(formatted_activities)}") # Get storage usage by file type including trash storage_by_type = db.session.query( case( (RoomFile.name.like('%.%'), func.substring(RoomFile.name, func.strpos(RoomFile.name, '.') + 1)), else_=literal_column("'unknown'") ).label('extension'), func.count(RoomFile.id).label('count'), func.sum(RoomFile.size).label('total_size') ).filter( RoomFile.type == 'file' ).group_by('extension').all() # Get trash and starred stats for admin trash_count = RoomFile.query.filter_by(deleted=True).count() starred_count = RoomFile.query.filter(RoomFile.starred_by.contains(current_user)).count() # Get oldest trash date and total trash size oldest_trash = RoomFile.query.filter_by(deleted=True).order_by(RoomFile.deleted_at.asc()).first() oldest_trash_date = oldest_trash.deleted_at.strftime('%Y-%m-%d') if oldest_trash and oldest_trash.deleted_at else None trash_size = db.session.query(db.func.sum(RoomFile.size)).filter(RoomFile.deleted==True).scalar() or 0 # Get files that will be deleted in next 7 days seven_days_from_now = datetime.utcnow() + timedelta(days=7) thirty_days_ago = datetime.utcnow() - timedelta(days=30) pending_deletion = RoomFile.query.filter( RoomFile.deleted==True, RoomFile.deleted_at <= thirty_days_ago, RoomFile.deleted_at > thirty_days_ago - timedelta(days=7) ).count() # Get trash file type breakdown trash_by_type = db.session.query( case( (RoomFile.name.like('%.%'), func.substring(RoomFile.name, func.strpos(RoomFile.name, '.') + 1)), else_=literal_column("'unknown'") ).label('extension'), func.count(RoomFile.id).label('count') ).filter( RoomFile.deleted==True ).group_by('extension').all() else: # Get rooms the user has access to accessible_rooms = Room.query.filter(Room.members.any(id=current_user.id)).all() room_count = len(accessible_rooms) # Get file and folder counts for accessible rooms room_ids = [room.id for room in accessible_rooms] file_count = RoomFile.query.filter(RoomFile.room_id.in_(room_ids), RoomFile.type == 'file').count() folder_count = RoomFile.query.filter(RoomFile.room_id.in_(room_ids), RoomFile.type == 'folder').count() # Get total size of files in accessible rooms including trash total_size = db.session.query(func.sum(RoomFile.size)).filter( RoomFile.room_id.in_(room_ids), RoomFile.type == 'file' ).scalar() or 0 # Get recent activity for accessible rooms recent_activity = db.session.query( RoomFile, Room, User ).join( Room, RoomFile.room_id == Room.id ).join( User, RoomFile.uploaded_by == User.id ).filter( RoomFile.room_id.in_(room_ids), RoomFile.deleted == False, RoomFile.uploaded_at.isnot(None) # Ensure uploaded_at is not null ).order_by( RoomFile.uploaded_at.desc() ).limit(10).all() # Format the activity data formatted_activity = [] for file, room, user in recent_activity: # Check if user has download permission permission = RoomMemberPermission.query.filter_by( room_id=room.id, user_id=current_user.id ).first() can_download = permission and permission.can_download if permission else False activity = { 'name': file.name, 'type': file.type, 'room': room, 'uploader': user, 'uploaded_at': file.uploaded_at, 'is_starred': current_user in file.starred_by, 'is_deleted': file.deleted, 'can_download': can_download } formatted_activity.append(activity) formatted_activities = formatted_activity # Get storage usage by file type for accessible rooms storage_by_type = db.session.query( case( (RoomFile.name.like('%.%'), func.substring(RoomFile.name, func.strpos(RoomFile.name, '.') + 1)), else_=literal_column("'unknown'") ).label('extension'), func.count(RoomFile.id).label('count'), func.sum(RoomFile.size).label('total_size') ).filter( RoomFile.room_id.in_(room_ids), RoomFile.type == 'file' ).group_by('extension').all() # Get trash and starred stats for user's accessible rooms trash_count = RoomFile.query.filter(RoomFile.room_id.in_(room_ids), RoomFile.deleted==True).count() starred_count = RoomFile.query.filter(RoomFile.room_id.in_(room_ids), RoomFile.starred_by.contains(current_user)).count() # Get oldest trash date and total trash size for accessible rooms oldest_trash = RoomFile.query.filter(RoomFile.room_id.in_(room_ids), RoomFile.deleted==True).order_by(RoomFile.deleted_at.asc()).first() oldest_trash_date = oldest_trash.deleted_at.strftime('%Y-%m-%d') if oldest_trash and oldest_trash.deleted_at else None trash_size = db.session.query(db.func.sum(RoomFile.size)).filter(RoomFile.room_id.in_(room_ids), RoomFile.deleted==True).scalar() or 0 # Get files that will be deleted in next 7 days for accessible rooms seven_days_from_now = datetime.utcnow() + timedelta(days=7) thirty_days_ago = datetime.utcnow() - timedelta(days=30) pending_deletion = RoomFile.query.filter( RoomFile.room_id.in_(room_ids), RoomFile.deleted==True, RoomFile.deleted_at <= thirty_days_ago, RoomFile.deleted_at > thirty_days_ago - timedelta(days=7) ).count() # Get trash file type breakdown for accessible rooms trash_by_type = db.session.query( case( (RoomFile.name.like('%.%'), func.substring(RoomFile.name, func.strpos(RoomFile.name, '.') + 1)), else_=literal_column("'unknown'") ).label('extension'), func.count(RoomFile.id).label('count') ).filter( RoomFile.room_id.in_(room_ids), RoomFile.deleted==True ).group_by('extension').all() # Get conversation stats if current_user.is_admin: conversation_count = Conversation.query.count() message_count = Message.query.count() attachment_count = MessageAttachment.query.count() conversation_total_size = db.session.query(func.sum(MessageAttachment.size)).scalar() or 0 recent_conversations = Conversation.query.order_by(Conversation.created_at.desc()).limit(5).all() else: # Get conversations where user is a member user_conversations = Conversation.query.filter(Conversation.members.any(id=current_user.id)).all() conversation_count = len(user_conversations) # Get message count for user's conversations conversation_ids = [conv.id for conv in user_conversations] message_count = Message.query.filter(Message.conversation_id.in_(conversation_ids)).count() # Get attachment count and size for user's conversations attachment_stats = db.session.query( func.count(MessageAttachment.id).label('count'), func.sum(MessageAttachment.size).label('total_size') ).filter(MessageAttachment.message_id.in_( db.session.query(Message.id).filter(Message.conversation_id.in_(conversation_ids)) )).first() attachment_count = attachment_stats.count or 0 conversation_total_size = attachment_stats.total_size or 0 # Get recent conversations for the user recent_conversations = Conversation.query.filter( Conversation.members.any(id=current_user.id) ).order_by(Conversation.created_at.desc()).limit(5).all() return render_template('dashboard/dashboard.html', room_count=room_count, file_count=file_count, folder_count=folder_count, total_size=total_size, storage_by_type=storage_by_type, recent_activities=formatted_activities, trash_count=trash_count, pending_deletion=pending_deletion, oldest_trash_date=oldest_trash_date, trash_size=trash_size, trash_by_type=trash_by_type, starred_count=starred_count, recent_events=recent_events, recent_contacts=recent_contacts, active_count=active_count, inactive_count=inactive_count, recent_notifications=recent_notifications, unread_notifications=get_unread_count(current_user.id), conversation_count=conversation_count, message_count=message_count, attachment_count=attachment_count, conversation_total_size=conversation_total_size, recent_conversations=recent_conversations, usage_stats=usage_stats, is_admin=current_user.is_admin ) def check_instance_status(instance): """Check the status of an instance by contacting its health endpoint""" try: # Construct the health check URL health_url = f"{instance.main_url.rstrip('/')}/health" response = requests.get(health_url, timeout=30) # Increased timeout to 30 seconds if response.status_code == 200: data = response.json() return { 'status': 'active' if data.get('status') == 'healthy' else 'inactive', 'details': json.dumps(data) # Convert dictionary to JSON string } else: return { 'status': 'inactive', 'details': f"Health check returned status code {response.status_code}" } except requests.RequestException as e: return { 'status': 'inactive', 'details': str(e) } @main_bp.route('/instances') @login_required @require_password_change def instances(): if not os.environ.get('MASTER', 'false').lower() == 'true': flash('This page is only available in master instances.', 'error') return redirect(url_for('main.dashboard')) instances = Instance.query.order_by(Instance.name.asc()).all() # Get Git settings git_settings = KeyValueSettings.get_value('git_settings') gitea_url = git_settings.get('url') if git_settings else None gitea_token = git_settings.get('token') if git_settings else None gitea_repo = git_settings.get('repo') if git_settings else None for instance in instances: # Check status status_info = check_instance_status(instance) instance.status = status_info['status'] instance.status_details = status_info['details'] db.session.commit() portainer_settings = KeyValueSettings.get_value('portainer_settings') nginx_settings = KeyValueSettings.get_value('nginx_settings') cloudflare_settings = KeyValueSettings.get_value('cloudflare_settings') return render_template('main/instances.html', instances=instances, portainer_settings=portainer_settings, nginx_settings=nginx_settings, git_settings=git_settings, cloudflare_settings=cloudflare_settings) @main_bp.route('/instances/add', methods=['POST']) @login_required @require_password_change def add_instance(): if not os.environ.get('MASTER', 'false').lower() == 'true': return jsonify({'error': 'Unauthorized'}), 403 data = request.get_json() try: # Ensure company is not None - use "Unknown" as default company = data.get('company', 'Unknown') if not company: company = 'Unknown' instance = Instance( name=data['name'], company=company, payment_plan=data['payment_plan'], main_url=data['main_url'], status='inactive' # New instances start as inactive ) db.session.add(instance) db.session.commit() return jsonify({ 'message': 'Instance added successfully', 'instance': { 'id': instance.id, 'name': instance.name, 'company': instance.company, 'rooms_count': instance.rooms_count, 'conversations_count': instance.conversations_count, 'data_size': instance.data_size, 'payment_plan': instance.payment_plan, 'main_url': instance.main_url, 'status': instance.status } }) except Exception as e: db.session.rollback() return jsonify({'error': str(e)}), 400 @main_bp.route('/instances/', methods=['PUT']) @login_required @require_password_change def update_instance(instance_id): if not os.environ.get('MASTER', 'false').lower() == 'true': return jsonify({'error': 'Unauthorized'}), 403 instance = Instance.query.get_or_404(instance_id) data = request.get_json() try: instance.name = data.get('name', instance.name) # Ensure company is not None - use current value or "Unknown" as default company = data.get('company', instance.company) instance.company = company if company else "Unknown" instance.payment_plan = data.get('payment_plan', instance.payment_plan) instance.main_url = data.get('main_url', instance.main_url) instance.status = data.get('status', instance.status) db.session.commit() return jsonify({ 'message': 'Instance updated successfully', 'instance': { 'id': instance.id, 'name': instance.name, 'company': instance.company, 'rooms_count': instance.rooms_count, 'conversations_count': instance.conversations_count, 'data_size': instance.data_size, 'payment_plan': instance.payment_plan, 'main_url': instance.main_url, 'status': instance.status } }) except Exception as e: db.session.rollback() return jsonify({'error': str(e)}), 400 @main_bp.route('/instances/', methods=['DELETE']) @login_required @require_password_change def delete_instance(instance_id): if not os.environ.get('MASTER', 'false').lower() == 'true': return jsonify({'error': 'Unauthorized'}), 403 instance = Instance.query.get_or_404(instance_id) try: db.session.delete(instance) db.session.commit() return jsonify({'message': 'Instance deleted successfully'}) except Exception as e: db.session.rollback() return jsonify({'error': str(e)}), 400 @main_bp.route('/instances//status') @login_required def check_status(instance_id): if not os.environ.get('MASTER', 'false').lower() == 'true': return jsonify({'error': 'Access denied'}), 403 instance = Instance.query.get_or_404(instance_id) status_info = check_instance_status(instance) # Update instance status in database instance.status = status_info['status'] instance.status_details = status_info['details'] db.session.commit() return jsonify(status_info) @main_bp.route('/instances//save-token', methods=['POST']) @login_required @require_password_change def save_instance_token(instance_id): if not os.environ.get('MASTER', 'false').lower() == 'true': return jsonify({'error': 'Unauthorized'}), 403 instance = Instance.query.get_or_404(instance_id) data = request.get_json() if not data or 'token' not in data: return jsonify({'error': 'Token is required'}), 400 try: instance.connection_token = data['token'] db.session.commit() return jsonify({'message': 'Token saved successfully'}) except Exception as e: db.session.rollback() return jsonify({'error': str(e)}), 400 @main_bp.route('/instances//detail') @login_required @require_password_change def instance_detail(instance_id): if not os.environ.get('MASTER', 'false').lower() == 'true': flash('This page is only available in master instances.', 'error') return redirect(url_for('main.dashboard')) instance = Instance.query.get_or_404(instance_id) # Check instance status status_info = check_instance_status(instance) instance.status = status_info['status'] instance.status_details = status_info['details'] # Fetch company name from instance settings try: if instance.connection_token: # First get JWT token jwt_response = requests.post( f"{instance.main_url.rstrip('/')}/api/admin/management-token", headers={ 'X-API-Key': instance.connection_token, 'Accept': 'application/json' }, timeout=5 ) if jwt_response.status_code == 200: jwt_data = jwt_response.json() jwt_token = jwt_data.get('token') if jwt_token: # Then fetch settings with JWT token response = requests.get( f"{instance.main_url.rstrip('/')}/api/admin/settings", headers={ 'Authorization': f'Bearer {jwt_token}', 'Accept': 'application/json' }, timeout=5 ) if response.status_code == 200: data = response.json() if 'company_name' in data: # Set company to "Unknown" if company_name is None or empty company_name = data['company_name'] instance.company = company_name if company_name else "Unknown" db.session.commit() except Exception as e: current_app.logger.error(f"Error fetching instance settings: {str(e)}") return render_template('main/instance_detail.html', instance=instance) @main_bp.route('/instances//auth-status') @login_required @require_password_change def instance_auth_status(instance_id): if not os.environ.get('MASTER', 'false').lower() == 'true': return jsonify({'error': 'Unauthorized'}), 403 instance = Instance.query.get_or_404(instance_id) # Check if instance has a connection token has_token = bool(instance.connection_token) # If there's a token, verify it's still valid is_valid = False if has_token: try: # Try to get a JWT token using the connection token response = requests.post( f"{instance.main_url.rstrip('/')}/api/admin/management-token", headers={ 'X-API-Key': instance.connection_token, 'Accept': 'application/json' }, timeout=5 ) is_valid = response.status_code == 200 except Exception as e: current_app.logger.error(f"Error verifying token: {str(e)}") is_valid = False return jsonify({ 'authenticated': has_token and is_valid, 'has_token': has_token, 'is_valid': is_valid }) UPLOAD_FOLDER = '/app/uploads/profile_pics' if not os.path.exists(UPLOAD_FOLDER): os.makedirs(UPLOAD_FOLDER) @main_bp.route('/profile', methods=['GET', 'POST']) @login_required @require_password_change def profile(): if request.method == 'POST': logger.debug(f"Profile form submitted with data: {request.form}") logger.debug(f"Files in request: {request.files}") try: # Handle profile picture removal if 'remove_picture' in request.form: logger.debug("Removing profile picture") if current_user.profile_picture: # Delete the old profile picture file old_picture_path = os.path.join(UPLOAD_FOLDER, current_user.profile_picture) if os.path.exists(old_picture_path): os.remove(old_picture_path) current_user.profile_picture = None db.session.commit() flash('Profile picture removed successfully!', 'success') return redirect(url_for('main.profile')) new_email = request.form.get('email') logger.debug(f"New email: {new_email}") # Check if the new email is already used by another user if new_email != current_user.email: existing_user = User.query.filter_by(email=new_email).first() if existing_user: flash('A user with this email already exists.', 'error') return render_template('profile/profile.html') # Handle profile picture upload file = request.files.get('profile_picture') if file and file.filename: logger.debug(f"Uploading new profile picture: {file.filename}") filename = secure_filename(file.filename) file_path = os.path.join(UPLOAD_FOLDER, filename) file.save(file_path) current_user.profile_picture = filename # Update user information current_user.username = request.form.get('first_name') current_user.last_name = request.form.get('last_name') current_user.email = new_email current_user.phone = request.form.get('phone') current_user.company = request.form.get('company') current_user.position = request.form.get('position') current_user.notes = request.form.get('notes') logger.debug(f"Updated user data: username={current_user.username}, last_name={current_user.last_name}, email={current_user.email}") # Handle password change if provided new_password = request.form.get('new_password') confirm_password = request.form.get('confirm_password') if new_password: if not confirm_password: flash('Please confirm your new password.', 'error') return render_template('profile/profile.html') if new_password != confirm_password: flash('Passwords do not match.', 'error') return render_template('profile/profile.html') current_user.set_password(new_password) # Create password change notification create_notification( notif_type='password_changed', user_id=current_user.id, details={ 'message': 'Your password has been changed successfully.', 'timestamp': datetime.utcnow().isoformat() } ) flash('Password updated successfully.', 'success') elif confirm_password: flash('Please enter a new password.', 'error') return render_template('profile/profile.html') # Create event details event_details = { 'user_id': current_user.id, 'email': current_user.email, 'update_type': 'profile_update', 'updated_fields': { 'username': current_user.username, 'last_name': current_user.last_name, 'email': current_user.email, 'phone': current_user.phone, 'company': current_user.company, 'position': current_user.position, 'notes': current_user.notes, 'profile_picture': bool(current_user.profile_picture) }, 'changes': { 'username': request.form.get('first_name'), 'last_name': request.form.get('last_name'), 'email': request.form.get('email'), 'phone': request.form.get('phone'), 'company': request.form.get('company'), 'position': request.form.get('position'), 'notes': request.form.get('notes'), 'password_changed': bool(new_password) } } logger.debug(f"Preparing to create profile update event with details: {event_details}") # Commit all changes db.session.commit() logger.debug("Profile changes and event committed to database successfully") flash('Profile updated successfully!', 'success') return redirect(url_for('main.dashboard')) except Exception as e: logger.error(f"Error updating profile: {str(e)}") logger.error(f"Full error details: {str(e.__class__.__name__)}: {str(e)}") db.session.rollback() flash('An error occurred while updating your profile.', 'error') return render_template('profile/profile.html') return render_template('profile/profile.html') @main_bp.route('/starred') @login_required @require_password_change def starred(): return render_template('starred/starred.html') @main_bp.route('/conversations') @login_required @require_password_change def conversations(): return redirect(url_for('conversations.conversations')) @main_bp.route('/trash') @login_required @require_password_change def trash(): return render_template('trash/trash.html') @main_bp.route('/notifications') @login_required @require_password_change def notifications(): # Get filter parameters notif_type = request.args.get('notif_type', '') date_range = request.args.get('date_range', '7d') page = request.args.get('page', 1, type=int) per_page = 10 # Calculate date range end_date = datetime.utcnow() if date_range == '24h': start_date = end_date - timedelta(days=1) elif date_range == '7d': start_date = end_date - timedelta(days=7) elif date_range == '30d': start_date = end_date - timedelta(days=30) else: start_date = None # Build query query = Notif.query.filter_by(user_id=current_user.id) if notif_type: query = query.filter_by(notif_type=notif_type) if start_date: query = query.filter(Notif.timestamp >= start_date) # Get total count for pagination total_notifs = query.count() total_pages = (total_notifs + per_page - 1) // per_page # Get paginated notifications notifications = query.order_by(Notif.timestamp.desc()).paginate(page=page, per_page=per_page) # Check if this is an AJAX request if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify({ 'notifications': [{ 'id': notif.id, 'notif_type': notif.notif_type, 'timestamp': notif.timestamp.strftime('%Y-%m-%d %H:%M:%S'), 'read': notif.read, 'details': notif.details, 'sender': { 'username': notif.sender.username, 'last_name': notif.sender.last_name } if notif.sender else None } for notif in notifications.items], 'total_pages': total_pages, 'current_page': page }) return render_template('notifications/notifications.html', notifications=notifications.items, total_pages=total_pages, current_page=page) @main_bp.route('/api/notifications') @login_required def get_notifications(): # Get filter parameters notif_type = request.args.get('notif_type', '') date_range = request.args.get('date_range', '7d') page = request.args.get('page', 1, type=int) per_page = 10 # Calculate date range end_date = datetime.utcnow() if date_range == '24h': start_date = end_date - timedelta(days=1) elif date_range == '7d': start_date = end_date - timedelta(days=7) elif date_range == '30d': start_date = end_date - timedelta(days=30) else: start_date = None # Build query query = Notif.query.filter_by(user_id=current_user.id) if notif_type: query = query.filter_by(notif_type=notif_type) if start_date: query = query.filter(Notif.timestamp >= start_date) # Get total count for pagination total_notifs = query.count() total_pages = (total_notifs + per_page - 1) // per_page # Get paginated notifications notifications = query.order_by(Notif.timestamp.desc()).paginate(page=page, per_page=per_page) return jsonify({ 'notifications': [{ 'id': notif.id, 'notif_type': notif.notif_type, 'timestamp': notif.timestamp.isoformat(), 'read': notif.read, 'details': notif.details, 'sender': { 'id': notif.sender.id, 'username': notif.sender.username, 'last_name': notif.sender.last_name } if notif.sender else None } for notif in notifications.items], 'total_pages': total_pages, 'current_page': page }) @main_bp.route('/api/notifications/') @login_required def get_notification_details(notif_id): notif = Notif.query.get_or_404(notif_id) if notif.user_id != current_user.id: return jsonify({'error': 'Unauthorized'}), 403 return jsonify({ 'id': notif.id, 'notif_type': notif.notif_type, 'timestamp': notif.timestamp.isoformat(), 'read': notif.read, 'details': notif.details, 'sender': { 'id': notif.sender.id, 'username': notif.sender.username } if notif.sender else None }) @main_bp.route('/api/notifications//read', methods=['POST']) @login_required def mark_notification_read(notif_id): notif = Notif.query.get_or_404(notif_id) if notif.user_id != current_user.id: return jsonify({'error': 'Unauthorized'}), 403 notif.read = True db.session.commit() return jsonify({'success': True}) @main_bp.route('/api/notifications/mark-all-read', methods=['POST']) @login_required def mark_all_notifications_read(): result = Notif.query.filter_by(user_id=current_user.id, read=False).update({'read': True}) db.session.commit() return jsonify({'success': True, 'count': result}) @main_bp.route('/api/notifications/', methods=['DELETE']) @login_required def delete_notification(notif_id): notif = Notif.query.get_or_404(notif_id) if notif.user_id != current_user.id: return jsonify({'error': 'Unauthorized'}), 403 db.session.delete(notif) db.session.commit() return jsonify({'success': True}) @main_bp.route('/settings') @login_required def settings(): if not current_user.is_admin: flash('You do not have permission to access settings.', 'error') return redirect(url_for('main.home')) active_tab = request.args.get('tab', 'colors') # Validate tab parameter valid_tabs = ['colors', 'general', 'email_templates', 'mails', 'security', 'events', 'debugging', 'smtp', 'connections'] if active_tab not in valid_tabs: active_tab = 'colors' site_settings = SiteSettings.get_settings() company_form = CompanySettingsForm() # Get SMTP settings for the SMTP tab smtp_settings = None if active_tab == 'smtp': smtp_settings = KeyValueSettings.get_value('smtp_settings') # Get connection settings portainer_settings = KeyValueSettings.get_value('portainer_settings') nginx_settings = KeyValueSettings.get_value('nginx_settings') git_settings = KeyValueSettings.get_value('git_settings') cloudflare_settings = KeyValueSettings.get_value('cloudflare_settings') # Get management API key for the connections tab management_api_key = ManagementAPIKey.query.filter_by(is_active=True).first() if management_api_key: site_settings.management_api_key = management_api_key.api_key # Get events for the events tab events = None total_pages = 0 current_page = 1 users = {} if active_tab == 'events': page = request.args.get('page', 1, type=int) per_page = 10 events = Event.query.order_by(Event.timestamp.desc()).paginate(page=page, per_page=per_page) total_pages = events.pages current_page = events.page # Get all users for the events user_ids = set() for event in events.items: user_ids.add(event.user_id) if event.details and 'target_user_id' in event.details: user_ids.add(event.details['target_user_id']) users = {user.id: user for user in User.query.filter(User.id.in_(user_ids)).all()} # Get email templates for the email templates tab email_templates = EmailTemplate.query.filter_by(is_active=True).all() # Get mails for the mails tab mails = None if active_tab == 'mails': page = request.args.get('page', 1, type=int) per_page = 10 mails = Mail.query.order_by(Mail.created_at.desc()).paginate(page=page, per_page=per_page) total_pages = mails.pages current_page = mails.page users = User.query.order_by(User.username).all() if request.method == 'GET': company_form.company_name.data = site_settings.company_name company_form.company_website.data = site_settings.company_website company_form.company_email.data = site_settings.company_email company_form.company_phone.data = site_settings.company_phone company_form.company_address.data = site_settings.company_address company_form.company_city.data = site_settings.company_city company_form.company_state.data = site_settings.company_state company_form.company_zip.data = site_settings.company_zip company_form.company_country.data = site_settings.company_country company_form.company_description.data = site_settings.company_description company_form.company_industry.data = site_settings.company_industry return render_template('settings/settings.html', primary_color=site_settings.primary_color, secondary_color=site_settings.secondary_color, active_tab=active_tab, site_settings=site_settings, events=events.items if events else None, mails=mails, total_pages=total_pages, current_page=current_page, users=users, email_templates=email_templates, form=company_form, smtp_settings=smtp_settings, portainer_settings=portainer_settings, nginx_settings=nginx_settings, git_settings=git_settings, cloudflare_settings=cloudflare_settings, csrf_token=generate_csrf()) @main_bp.route('/settings/update-smtp', methods=['POST']) @login_required def update_smtp_settings(): if not current_user.is_admin: return jsonify({'error': 'Unauthorized'}), 403 try: # Get SMTP settings from form smtp_settings = { 'smtp_host': request.form.get('smtp_host'), 'smtp_port': int(request.form.get('smtp_port')), 'smtp_security': request.form.get('smtp_security'), 'smtp_username': request.form.get('smtp_username'), 'smtp_password': request.form.get('smtp_password'), 'smtp_from_email': request.form.get('smtp_from_email'), 'smtp_from_name': request.form.get('smtp_from_name') } # Save to database using KeyValueSettings KeyValueSettings.set_value('smtp_settings', smtp_settings) flash('SMTP settings updated successfully.', 'success') return redirect(url_for('main.settings', tab='smtp')) except Exception as e: db.session.rollback() flash(f'Error updating SMTP settings: {str(e)}', 'error') return redirect(url_for('main.settings', tab='smtp')) @main_bp.route('/settings/test-smtp', methods=['POST']) @login_required def test_smtp_connection(): if not current_user.is_admin: return jsonify({'error': 'Unauthorized'}), 403 try: data = request.get_json() # Create SMTP connection if data['smtp_security'] == 'ssl': smtp = smtplib.SMTP_SSL(data['smtp_host'], int(data['smtp_port'])) else: smtp = smtplib.SMTP(data['smtp_host'], int(data['smtp_port'])) # Start TLS if needed if data['smtp_security'] == 'tls': smtp.starttls() # Login smtp.login(data['smtp_username'], data['smtp_password']) # Close connection smtp.quit() return jsonify({'success': True}) except Exception as e: return jsonify({ 'success': False, 'error': str(e) }) @main_bp.route('/settings/colors', methods=['POST']) @login_required def update_colors(): if not current_user.is_admin: flash('Only administrators can update settings.', 'error') return redirect(url_for('main.dashboard')) logger.debug(f"Form data: {request.form}") logger.debug(f"CSRF token: {request.form.get('csrf_token')}") primary_color = request.form.get('primary_color') secondary_color = request.form.get('secondary_color') logger.debug(f"Primary color: {primary_color}") logger.debug(f"Secondary color: {secondary_color}") if not primary_color or not secondary_color: flash('Both primary and secondary colors are required.', 'error') return redirect(url_for('main.settings')) site_settings = SiteSettings.get_settings() site_settings.primary_color = primary_color site_settings.secondary_color = secondary_color try: db.session.commit() logger.debug("Colors updated successfully in database") # Log the color settings update log_event( event_type='settings_update', details={ 'user_id': current_user.id, 'user_name': f"{current_user.username} {current_user.last_name}", 'update_type': 'colors', 'changes': { 'primary_color': primary_color, 'secondary_color': secondary_color } } ) db.session.commit() flash('Color settings updated successfully!', 'success') except Exception as e: logger.error(f"Error updating colors: {str(e)}") db.session.rollback() flash('An error occurred while updating color settings.', 'error') return redirect(url_for('main.settings')) @main_bp.route('/settings/colors/reset', methods=['POST']) @login_required def reset_colors(): if not current_user.is_admin: flash('Only administrators can update settings.', 'error') return redirect(url_for('main.dashboard')) site_settings = SiteSettings.get_settings() site_settings.primary_color = '#16767b' # Default from colors.css site_settings.secondary_color = '#741b5f' # Default from colors.css try: db.session.commit() flash('Colors reset to defaults successfully!', 'success') except Exception as e: db.session.rollback() flash('An error occurred while resetting colors.', 'error') return redirect(url_for('main.settings')) @main_bp.route('/settings/company', methods=['POST']) @login_required def update_company_settings(): if not current_user.is_admin: flash('Only administrators can update settings.', 'error') return redirect(url_for('main.dashboard')) form = CompanySettingsForm() if not form.validate(): flash('Please check the form for errors.', 'error') return redirect(url_for('main.settings')) site_settings = SiteSettings.get_settings() # Handle logo upload if form.company_logo.data: logo_file = form.company_logo.data if logo_file.filename: # Delete old logo if it exists if site_settings.company_logo: old_logo_path = os.path.join('static', 'uploads', 'company_logos', site_settings.company_logo) if os.path.exists(old_logo_path): os.remove(old_logo_path) # Save new logo filename = secure_filename(logo_file.filename) # Add timestamp to filename to prevent caching issues filename = f"{int(time.time())}_{filename}" logo_path = os.path.join('static', 'uploads', 'company_logos', filename) logo_file.save(logo_path) site_settings.company_logo = filename # Update all company fields site_settings.company_name = form.company_name.data site_settings.company_website = form.company_website.data site_settings.company_email = form.company_email.data site_settings.company_phone = form.company_phone.data site_settings.company_address = form.company_address.data site_settings.company_city = form.company_city.data site_settings.company_state = form.company_state.data site_settings.company_zip = form.company_zip.data site_settings.company_country = form.company_country.data site_settings.company_description = form.company_description.data site_settings.company_industry = form.company_industry.data try: db.session.commit() # Log the company settings update log_event( event_type='settings_update', details={ 'user_id': current_user.id, 'user_name': f"{current_user.username} {current_user.last_name}", 'update_type': 'company_settings', 'changes': { 'company_name': site_settings.company_name, 'company_website': site_settings.company_website, 'company_email': site_settings.company_email, 'company_phone': site_settings.company_phone, 'company_address': site_settings.company_address, 'company_city': site_settings.company_city, 'company_state': site_settings.company_state, 'company_zip': site_settings.company_zip, 'company_country': site_settings.company_country, 'company_description': site_settings.company_description, 'company_industry': site_settings.company_industry, 'logo_updated': bool(form.company_logo.data) } } ) db.session.commit() flash('Company settings updated successfully!', 'success') except Exception as e: logger.error(f"Error updating company settings: {str(e)}") db.session.rollback() flash('An error occurred while updating company settings.', 'error') return redirect(url_for('main.settings')) @main_bp.route('/dynamic-colors.css') def dynamic_colors(): """Generate dynamic CSS variables based on site settings""" logger.info(f"[Dynamic Colors] Request from: {request.referrer}") # Get colors from settings site_settings = SiteSettings.get_settings() primary_color = site_settings.primary_color secondary_color = site_settings.secondary_color logger.info(f"[Dynamic Colors] Current colors - Primary: {primary_color}, Secondary: {secondary_color}") # Convert hex to RGB for opacity calculations def hex_to_rgb(hex_color): hex_color = hex_color.lstrip('#') return tuple(int(hex_color[i:i+2], 16) for i in (0, 2, 4)) def rgb_to_hex(rgb): return '#{:02x}{:02x}{:02x}'.format(rgb[0], rgb[1], rgb[2]) def lighten_color(hex_color, amount=0.2): rgb = hex_to_rgb(hex_color) rgb = tuple(min(255, int(c + (255 - c) * amount)) for c in rgb) return rgb_to_hex(rgb) # Calculate derived colors primary_rgb = hex_to_rgb(primary_color) secondary_rgb = hex_to_rgb(secondary_color) # Lighten colors for hover states primary_light = lighten_color(primary_color, 0.2) secondary_light = lighten_color(secondary_color, 0.2) # Generate CSS with opacity variables css = f""" :root {{ /* Primary Colors */ --primary-color: {primary_color}; --primary-light: {primary_light}; --primary-rgb: {primary_rgb[0]}, {primary_rgb[1]}, {primary_rgb[2]}; --primary-opacity-8: rgba({primary_rgb[0]}, {primary_rgb[1]}, {primary_rgb[2]}, 0.08); --primary-opacity-15: rgba({primary_rgb[0]}, {primary_rgb[1]}, {primary_rgb[2]}, 0.15); --primary-opacity-25: rgba({primary_rgb[0]}, {primary_rgb[1]}, {primary_rgb[2]}, 0.25); --primary-opacity-50: rgba({primary_rgb[0]}, {primary_rgb[1]}, {primary_rgb[2]}, 0.5); /* Secondary Colors */ --secondary-color: {secondary_color}; --secondary-light: {secondary_light}; --secondary-rgb: {secondary_rgb[0]}, {secondary_rgb[1]}, {secondary_rgb[2]}; --secondary-opacity-8: rgba({secondary_rgb[0]}, {secondary_rgb[1]}, {secondary_rgb[2]}, 0.08); --secondary-opacity-15: rgba({secondary_rgb[0]}, {secondary_rgb[1]}, {secondary_rgb[2]}, 0.15); --secondary-opacity-25: rgba({secondary_rgb[0]}, {secondary_rgb[1]}, {secondary_rgb[2]}, 0.25); --secondary-opacity-50: rgba({secondary_rgb[0]}, {secondary_rgb[1]}, {secondary_rgb[2]}, 0.5); /* Chart Colors */ --chart-color-1: {primary_color}; --chart-color-2: {secondary_color}; --chart-color-3: {lighten_color(primary_color, 0.4)}; --chart-color-4: {lighten_color(secondary_color, 0.4)}; }} """ logger.info(f"[Dynamic Colors] Generated CSS with primary color: {primary_color}") logger.info(f"[Dynamic Colors] Cache version: {site_settings.updated_at.timestamp()}") return Response(css, mimetype='text/css') @main_bp.route('/settings/events') @login_required def events(): if not current_user.is_admin: flash('Only administrators can access event logs.', 'error') return redirect(url_for('main.dashboard')) # Get filter parameters event_type = request.args.get('event_type') date_range = request.args.get('date_range', '7d') user_id = request.args.get('user_id') page = request.args.get('page', 1, type=int) per_page = 10 # Calculate date range end_date = datetime.utcnow() if date_range == '24h': start_date = end_date - timedelta(days=1) elif date_range == '7d': start_date = end_date - timedelta(days=7) elif date_range == '30d': start_date = end_date - timedelta(days=30) else: start_date = None # Build query query = Event.query if event_type: query = query.filter_by(event_type=event_type) if start_date: query = query.filter(Event.timestamp >= start_date) if user_id: query = query.filter_by(user_id=user_id) # Get total count for pagination total_events = query.count() total_pages = (total_events + per_page - 1) // per_page # Get paginated events events = query.order_by(Event.timestamp.desc()).paginate(page=page, per_page=per_page) # Get all users for filter dropdown users = User.query.order_by(User.username).all() # Check if this is an AJAX request if request.headers.get('X-Requested-With') == 'XMLHttpRequest': logger.info(f"Processing AJAX request for events. Found {len(events.items)} events") return render_template('settings/tabs/events.html', events=events.items, total_pages=total_pages, current_page=page, event_type=event_type, date_range=date_range, user_id=user_id, users=users, csrf_token=generate_csrf()) # For full page requests, render the full settings page site_settings = SiteSettings.get_settings() return render_template('settings/settings.html', primary_color=site_settings.primary_color, secondary_color=site_settings.secondary_color, active_tab='events', site_settings=site_settings, events=events.items, total_pages=total_pages, current_page=page, users=users, csrf_token=generate_csrf()) @main_bp.route('/api/events') @login_required def get_events(): if not current_user.is_admin: return jsonify({'error': 'Unauthorized'}), 403 # Get filter parameters event_type = request.args.get('event_type') date_range = request.args.get('date_range', '7d') user_id = request.args.get('user_id') page = request.args.get('page', 1, type=int) per_page = 10 # Calculate date range end_date = datetime.utcnow() if date_range == '24h': start_date = end_date - timedelta(days=1) elif date_range == '7d': start_date = end_date - timedelta(days=7) elif date_range == '30d': start_date = end_date - timedelta(days=30) else: start_date = None # Build query query = Event.query if event_type: query = query.filter_by(event_type=event_type) if start_date: query = query.filter(Event.timestamp >= start_date) if user_id: query = query.filter_by(user_id=user_id) # Get total count for pagination total_events = query.count() total_pages = (total_events + per_page - 1) // per_page # Get paginated events events = query.order_by(Event.timestamp.desc()).paginate(page=page, per_page=per_page) return jsonify({ 'events': [{ 'id': event.id, 'event_type': event.event_type, 'timestamp': event.timestamp.isoformat(), 'user': { 'id': event.user.id, 'username': event.user.username, 'last_name': event.user.last_name } if event.user else None, 'ip_address': event.ip_address, 'details': event.details } for event in events.items], 'current_page': page, 'total_pages': total_pages }) @main_bp.route('/api/events/') @login_required def get_event_details(event_id): if not current_user.is_admin: return jsonify({'error': 'Unauthorized'}), 403 event = Event.query.get_or_404(event_id) return jsonify({ 'id': event.id, 'event_type': event.event_type, 'timestamp': event.timestamp.isoformat(), 'details': event.details, 'ip_address': event.ip_address, 'user_agent': event.user_agent, 'user': { 'id': event.user.id, 'username': event.user.username, 'last_name': event.user.last_name, 'email': event.user.email } if event.user else None }) @main_bp.route('/settings/events/download') @login_required def download_events(): if not current_user.is_admin: flash('Only administrators can download event logs.', 'error') return redirect(url_for('main.dashboard')) # Get filter parameters event_type = request.args.get('event_type') date_range = request.args.get('date_range', '7d') user_id = request.args.get('user_id') # Calculate date range end_date = datetime.utcnow() if date_range == '24h': start_date = end_date - timedelta(days=1) elif date_range == '7d': start_date = end_date - timedelta(days=7) elif date_range == '30d': start_date = end_date - timedelta(days=30) else: start_date = None # Build query query = Event.query if event_type: query = query.filter_by(event_type=event_type) if start_date: query = query.filter(Event.timestamp >= start_date) if user_id: query = query.filter_by(user_id=user_id) # Get all events events = query.order_by(Event.timestamp.desc()).all() # Create CSV content import csv import io output = io.StringIO() writer = csv.writer(output) # Write header writer.writerow(['Timestamp', 'Event Type', 'User', 'Details', 'IP Address']) # Write data for event in events: user_name = f"{event.user.username} {event.user.last_name}" if event.user else "System" writer.writerow([ event.timestamp.strftime('%Y-%m-%d %H:%M:%S'), event.event_type, user_name, str(event.details), event.ip_address ]) # Create the response output.seek(0) return Response( output, mimetype='text/csv', headers={ 'Content-Disposition': f'attachment; filename=event_log_{datetime.utcnow().strftime("%Y%m%d_%H%M%S")}.csv' } ) @main_bp.route('/settings/save-portainer-connection', methods=['POST']) @login_required def save_portainer_connection(): if not current_user.is_admin: return jsonify({'error': 'Unauthorized'}), 403 data = request.get_json() url = data.get('url') api_key = data.get('api_key') if not url or not api_key: return jsonify({'error': 'Missing required fields'}), 400 try: # Save Portainer settings KeyValueSettings.set_value('portainer_settings', { 'url': url, 'api_key': api_key }) return jsonify({'message': 'Settings saved successfully'}) except Exception as e: return jsonify({'error': str(e)}), 500 @main_bp.route('/settings/save-nginx-connection', methods=['POST']) @login_required def save_nginx_connection(): if not current_user.is_admin: return jsonify({'error': 'Unauthorized'}), 403 data = request.get_json() url = data.get('url') username = data.get('username') password = data.get('password') if not url or not username or not password: return jsonify({'error': 'Missing required fields'}), 400 try: # Save NGINX Proxy Manager settings KeyValueSettings.set_value('nginx_settings', { 'url': url, 'username': username, 'password': password }) return jsonify({'message': 'Settings saved successfully'}) except Exception as e: return jsonify({'error': str(e)}), 500 @main_bp.route('/settings/save-git-connection', methods=['POST']) @login_required def save_git_connection(): if not current_user.is_admin: return jsonify({'error': 'Unauthorized'}), 403 data = request.get_json() provider = data.get('provider') url = data.get('url') username = data.get('username') token = data.get('token') repo = data.get('repo') if not provider or not url or not username or not token or not repo: return jsonify({'error': 'Missing required fields'}), 400 if provider not in ['gitea', 'gitlab']: return jsonify({'error': 'Invalid provider'}), 400 try: # Save Git settings KeyValueSettings.set_value('git_settings', { 'provider': provider, 'url': url, 'username': username, 'token': token, 'repo': repo }) return jsonify({'message': 'Settings saved successfully'}) except Exception as e: return jsonify({'error': str(e)}), 500 @main_bp.route('/settings/test-git-connection', methods=['POST']) @login_required def test_git_connection(): if not current_user.is_admin: return jsonify({'error': 'Unauthorized'}), 403 data = request.get_json() provider = data.get('provider') url = data.get('url') username = data.get('username') token = data.get('token') if not provider or not url or not username or not token: return jsonify({'error': 'Missing required fields'}), 400 if provider not in ['gitea', 'gitlab']: return jsonify({'error': 'Invalid provider'}), 400 try: if provider == 'gitea': # Test Gitea connection with different authentication methods headers = { 'Accept': 'application/json' } # First try token in Authorization header headers['Authorization'] = f'token {token}' # Try to get user information response = requests.get( f'{url.rstrip("/")}/api/v1/user', headers=headers, timeout=5 ) # If that fails, try token as query parameter if response.status_code != 200: response = requests.get( f'{url.rstrip("/")}/api/v1/user?token={token}', headers={'Accept': 'application/json'}, timeout=5 ) if response.status_code == 200: return jsonify({'message': 'Connection successful'}) else: return jsonify({'error': f'Connection failed: {response.json().get("message", "Unknown error")}'}), 400 elif provider == 'gitlab': # Test GitLab connection headers = { 'PRIVATE-TOKEN': token, 'Accept': 'application/json' } # Try to get user information response = requests.get( f'{url.rstrip("/")}/api/v4/user', headers=headers, timeout=5 ) if response.status_code == 200: return jsonify({'message': 'Connection successful'}) else: return jsonify({'error': f'Connection failed: {response.json().get("message", "Unknown error")}'}), 400 except Exception as e: return jsonify({'error': f'Connection failed: {str(e)}'}), 400 @main_bp.route('/settings/save-cloudflare-connection', methods=['POST']) @login_required def save_cloudflare_connection(): if not current_user.is_admin: return jsonify({'error': 'Unauthorized'}), 403 data = request.get_json() email = data.get('email') api_key = data.get('api_key') zone_id = data.get('zone_id') server_ip = data.get('server_ip') if not email or not api_key or not zone_id or not server_ip: return jsonify({'error': 'Missing required fields'}), 400 try: # Save Cloudflare settings KeyValueSettings.set_value('cloudflare_settings', { 'email': email, 'api_key': api_key, 'zone_id': zone_id, 'server_ip': server_ip }) return jsonify({'message': 'Settings saved successfully'}) except Exception as e: return jsonify({'error': str(e)}), 500 @main_bp.route('/settings/test-cloudflare-connection', methods=['POST']) @login_required def test_cloudflare_connection(): if not current_user.is_admin: return jsonify({'error': 'Unauthorized'}), 403 data = request.get_json() email = data.get('email') api_key = data.get('api_key') zone_id = data.get('zone_id') server_ip = data.get('server_ip') if not email or not api_key or not zone_id or not server_ip: return jsonify({'error': 'Missing required fields'}), 400 try: # Test Cloudflare connection by getting zone details headers = { 'X-Auth-Email': email, 'X-Auth-Key': api_key, 'Content-Type': 'application/json' } # Try to get zone information response = requests.get( f'https://api.cloudflare.com/client/v4/zones/{zone_id}', headers=headers, timeout=10 ) if response.status_code == 200: zone_data = response.json() if zone_data.get('success'): return jsonify({'message': 'Connection successful'}) else: return jsonify({'error': f'API error: {zone_data.get("errors", [{}])[0].get("message", "Unknown error")}'}), 400 else: return jsonify({'error': f'Connection failed: HTTP {response.status_code}'}), 400 except Exception as e: return jsonify({'error': f'Connection failed: {str(e)}'}), 400 @main_bp.route('/instances/launch-progress') @login_required @require_password_change def launch_progress(): if not os.environ.get('MASTER', 'false').lower() == 'true': flash('This page is only available in master instances.', 'error') return redirect(url_for('main.dashboard')) # Get NGINX settings nginx_settings = KeyValueSettings.get_value('nginx_settings') # Get Portainer settings portainer_settings = KeyValueSettings.get_value('portainer_settings') # Get Cloudflare settings cloudflare_settings = KeyValueSettings.get_value('cloudflare_settings') return render_template('main/launch_progress.html', nginx_settings=nginx_settings, portainer_settings=portainer_settings, cloudflare_settings=cloudflare_settings) @main_bp.route('/api/check-dns', methods=['POST']) @login_required @require_password_change def check_dns(): if not os.environ.get('MASTER', 'false').lower() == 'true': return jsonify({'error': 'Unauthorized'}), 403 data = request.get_json() if not data or 'domains' not in data: return jsonify({'error': 'No domains provided'}), 400 domains = data['domains'] results = {} for domain in domains: try: # Try to resolve the domain ip_address = socket.gethostbyname(domain) results[domain] = { 'resolved': True, 'ip': ip_address } except socket.gaierror: results[domain] = { 'resolved': False, 'error': 'No DNS record found' } return jsonify({ 'success': True, 'results': results }) @main_bp.route('/api/check-cloudflare-connection', methods=['POST']) @login_required @require_password_change def check_cloudflare_connection(): if not os.environ.get('MASTER', 'false').lower() == 'true': return jsonify({'error': 'Unauthorized'}), 403 # Get Cloudflare settings cloudflare_settings = KeyValueSettings.get_value('cloudflare_settings') if not cloudflare_settings: return jsonify({'error': 'Cloudflare settings not configured'}), 400 try: # Test Cloudflare connection by getting zone details headers = { 'X-Auth-Email': cloudflare_settings['email'], 'X-Auth-Key': cloudflare_settings['api_key'], 'Content-Type': 'application/json' } # Try to get zone information response = requests.get( f'https://api.cloudflare.com/client/v4/zones/{cloudflare_settings["zone_id"]}', headers=headers, timeout=10 ) if response.status_code == 200: zone_data = response.json() if zone_data.get('success'): return jsonify({ 'success': True, 'message': 'Cloudflare connection successful', 'zone_name': zone_data['result']['name'] }) else: return jsonify({'error': f'API error: {zone_data.get("errors", [{}])[0].get("message", "Unknown error")}'}), 400 else: return jsonify({'error': f'Connection failed: HTTP {response.status_code}'}), 400 except Exception as e: return jsonify({'error': f'Connection failed: {str(e)}'}), 400 @main_bp.route('/api/create-dns-records', methods=['POST']) @login_required @require_password_change def create_dns_records(): if not os.environ.get('MASTER', 'false').lower() == 'true': return jsonify({'error': 'Unauthorized'}), 403 data = request.get_json() if not data or 'domains' not in data: return jsonify({'error': 'No domains provided'}), 400 domains = data['domains'] # Get Cloudflare settings cloudflare_settings = KeyValueSettings.get_value('cloudflare_settings') if not cloudflare_settings: return jsonify({'error': 'Cloudflare settings not configured'}), 400 try: headers = { 'X-Auth-Email': cloudflare_settings['email'], 'X-Auth-Key': cloudflare_settings['api_key'], 'Content-Type': 'application/json' } results = {} for domain in domains: # Check if DNS record already exists response = requests.get( f'https://api.cloudflare.com/client/v4/zones/{cloudflare_settings["zone_id"]}/dns_records', headers=headers, params={'name': domain}, timeout=10 ) if response.status_code == 200: dns_data = response.json() existing_records = dns_data.get('result', []) # Filter for A records a_records = [record for record in existing_records if record['type'] == 'A' and record['name'] == domain] if a_records: # Update existing A record record_id = a_records[0]['id'] update_data = { 'type': 'A', 'name': domain, 'content': cloudflare_settings['server_ip'], 'ttl': 1, # Auto TTL 'proxied': True } update_response = requests.put( f'https://api.cloudflare.com/client/v4/zones/{cloudflare_settings["zone_id"]}/dns_records/{record_id}', headers=headers, json=update_data, timeout=10 ) if update_response.status_code == 200: results[domain] = {'status': 'updated', 'message': 'DNS record updated'} else: results[domain] = {'status': 'error', 'message': f'Failed to update DNS record: {update_response.status_code}'} else: # Create new A record create_data = { 'type': 'A', 'name': domain, 'content': cloudflare_settings['server_ip'], 'ttl': 1, # Auto TTL 'proxied': True } create_response = requests.post( f'https://api.cloudflare.com/client/v4/zones/{cloudflare_settings["zone_id"]}/dns_records', headers=headers, json=create_data, timeout=10 ) if create_response.status_code == 200: results[domain] = {'status': 'created', 'message': 'DNS record created'} else: results[domain] = {'status': 'error', 'message': f'Failed to create DNS record: {create_response.status_code}'} else: results[domain] = {'status': 'error', 'message': f'Failed to check existing records: {response.status_code}'} return jsonify({ 'success': True, 'results': results }) except Exception as e: return jsonify({'error': f'DNS operation failed: {str(e)}'}), 400 @main_bp.route('/api/mails/') @login_required def get_mail_details(mail_id): if not current_user.is_admin: return jsonify({'error': 'Unauthorized'}), 403 mail = Mail.query.get_or_404(mail_id) return jsonify({ 'id': mail.id, 'recipient': mail.recipient, 'subject': mail.subject, 'body': mail.body, 'status': mail.status, 'created_at': mail.created_at.isoformat(), 'sent_at': mail.sent_at.isoformat() if mail.sent_at else None, 'template': { 'id': mail.template.id, 'name': mail.template.name } if mail.template else None }) @main_bp.route('/development-wiki') @login_required @require_password_change def development_wiki(): if not os.environ.get('MASTER', 'false').lower() == 'true': flash('This page is only available in master instances.', 'error') return redirect(url_for('main.dashboard')) return render_template('wiki/base.html') @main_bp.route('/api/version') def api_version(): # Get version information from environment variables version = os.getenv('APP_VERSION', 'unknown') commit = os.getenv('GIT_COMMIT', 'unknown') branch = os.getenv('GIT_BRANCH', 'unknown') deployed_at = os.getenv('DEPLOYED_AT', 'unknown') return jsonify({ 'version': version, 'tag': version, 'commit': commit, 'branch': branch, 'deployed_at': deployed_at })