from flask import Blueprint, jsonify, request, current_app, make_response, flash, redirect, url_for from functools import wraps from models import ( KeyValueSettings, User, Room, Conversation, RoomFile, SiteSettings, DocuPulseSettings, Event, Mail, ManagementAPIKey ) from extensions import db, csrf from datetime import datetime, timedelta import os import jwt from werkzeug.security import generate_password_hash import secrets from flask_login import login_user import requests import json admin_api = Blueprint('admin_api', __name__) def add_cors_headers(response): """Add CORS headers to the response""" response.headers['Access-Control-Allow-Origin'] = '*' response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, DELETE, OPTIONS' response.headers['Access-Control-Allow-Headers'] = 'Content-Type, Authorization, X-API-Key, X-CSRF-Token' return response @admin_api.before_request def handle_preflight(): """Handle preflight requests""" if request.method == 'OPTIONS': response = make_response() return add_cors_headers(response) @admin_api.after_request def after_request(response): """Add CORS headers to all responses""" return add_cors_headers(response) def token_required(f): @wraps(f) def decorated(*args, **kwargs): token = None if 'Authorization' in request.headers: token = request.headers['Authorization'].split(" ")[1] if not token: return jsonify({'message': 'Token is missing!'}), 401 try: data = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"]) # Check if it's a management tool token if data.get('is_management'): return f(None, *args, **kwargs) # Pass None as current_user for management tool current_user = User.query.get(data['user_id']) if not current_user or not current_user.is_admin: return jsonify({'message': 'Invalid token or insufficient permissions!'}), 401 except: return jsonify({'message': 'Invalid token!'}), 401 return f(current_user, *args, **kwargs) return decorated def generate_management_api_key(): """Generate a secure API key for the management tool""" return secrets.token_urlsafe(32) def validate_management_api_key(api_key): """Validate if the provided API key is valid""" key = ManagementAPIKey.query.filter_by(api_key=api_key, is_active=True).first() if key: key.last_used_at = datetime.utcnow() db.session.commit() return True return False @admin_api.route('/login', methods=['POST']) @csrf.exempt def admin_login(): try: # Check if this is an API request is_api_request = request.headers.get('Accept') == 'application/json' or \ request.headers.get('Content-Type') == 'application/json' if is_api_request: data = request.get_json() else: data = request.form if not data or 'email' not in data or 'password' not in data: if is_api_request: return jsonify({ 'message': 'Email and password are required', 'status': 'error' }), 400 flash('Email and password are required', 'error') return redirect(url_for('auth.login')) user = User.query.filter_by(email=data['email']).first() if not user or not user.is_admin or not user.check_password(data['password']): if is_api_request: return jsonify({ 'message': 'Invalid credentials or not an admin', 'status': 'error' }), 401 flash('Invalid credentials or not an admin', 'error') return redirect(url_for('auth.login')) # For API requests, return JWT token if is_api_request: token = jwt.encode({ 'user_id': user.id, 'is_admin': True, 'exp': datetime.utcnow() + timedelta(days=1) # Token expires in 1 day }, current_app.config['SECRET_KEY'], algorithm="HS256") return jsonify({ 'token': token, 'status': 'success' }), 200 # For web requests, use session-based auth login_user(user) return redirect(url_for('main.dashboard')) except Exception as e: current_app.logger.error(f"Login error: {str(e)}") if is_api_request: return jsonify({ 'message': 'An error occurred during login', 'status': 'error' }), 500 flash('An error occurred during login', 'error') return redirect(url_for('auth.login')) @admin_api.route('/management-token', methods=['POST']) @csrf.exempt def get_management_token(): """Generate a JWT token for the management tool using API key authentication""" api_key = request.headers.get('X-API-Key') if not api_key or not validate_management_api_key(api_key): return jsonify({'message': 'Invalid API key'}), 401 # Create a token without expiration token = jwt.encode({ 'user_id': 0, # Special user ID for management tool 'is_management': True }, current_app.config['SECRET_KEY'], algorithm="HS256") return jsonify({ 'token': token }) @admin_api.route('/management-api-key', methods=['POST']) @csrf.exempt @token_required def create_management_api_key(current_user): """Create a new API key for the management tool (only accessible by admin users)""" if not current_user.is_admin: return jsonify({'message': 'Insufficient permissions'}), 403 data = request.get_json() if not data or 'name' not in data: return jsonify({'message': 'Name is required'}), 400 api_key = generate_management_api_key() key = ManagementAPIKey( api_key=api_key, name=data['name'], created_by=current_user.id ) db.session.add(key) db.session.commit() return jsonify({ 'api_key': api_key, 'name': key.name, 'created_at': key.created_at.isoformat(), 'message': 'API key generated successfully. Store this key securely as it will not be shown again.' }), 201 @admin_api.route('/management-api-keys', methods=['GET']) @csrf.exempt @token_required def list_management_api_keys(current_user): """List all management API keys (only accessible by admin users)""" if not current_user.is_admin: return jsonify({'message': 'Insufficient permissions'}), 403 keys = ManagementAPIKey.query.all() return jsonify([{ 'id': key.id, 'name': key.name, 'created_at': key.created_at.isoformat(), 'last_used_at': key.last_used_at.isoformat() if key.last_used_at else None, 'is_active': key.is_active, 'created_by': key.created_by } for key in keys]) @admin_api.route('/management-api-key/', methods=['DELETE']) @csrf.exempt @token_required def revoke_management_api_key(current_user, key_id): """Revoke a management API key (only accessible by admin users)""" if not current_user.is_admin: return jsonify({'message': 'Insufficient permissions'}), 403 key = ManagementAPIKey.query.get(key_id) if not key: return jsonify({'message': 'API key not found'}), 404 key.is_active = False db.session.commit() return jsonify({'message': 'API key revoked successfully'}) # Key-Value Settings CRUD @admin_api.route('/key-value', methods=['GET']) @csrf.exempt @token_required def get_key_values(current_user): settings = KeyValueSettings.query.all() return jsonify([{'key': s.key, 'value': s.value} for s in settings]) @admin_api.route('/key-value/', methods=['GET']) @token_required def get_key_value(current_user, key): setting = KeyValueSettings.query.filter_by(key=key).first() if not setting: return jsonify({'message': 'Key not found'}), 404 return jsonify({'key': setting.key, 'value': setting.value}) @admin_api.route('/key-value', methods=['POST']) @csrf.exempt @token_required def create_key_value(current_user): data = request.get_json() if not data or 'key' not in data or 'value' not in data: return jsonify({'message': 'Missing key or value'}), 400 setting = KeyValueSettings(key=data['key'], value=data['value']) db.session.add(setting) db.session.commit() return jsonify({'message': 'Key-value pair created successfully'}), 201 @admin_api.route('/key-value/', methods=['PUT']) @csrf.exempt @token_required def update_key_value(current_user, key): setting = KeyValueSettings.query.filter_by(key=key).first() if not setting: return jsonify({'message': 'Key not found'}), 404 data = request.get_json() if not data or 'value' not in data: return jsonify({'message': 'Missing value'}), 400 setting.value = data['value'] db.session.commit() return jsonify({'message': 'Key-value pair updated successfully'}) @admin_api.route('/key-value/', methods=['DELETE']) @csrf.exempt @token_required def delete_key_value(current_user, key): setting = KeyValueSettings.query.filter_by(key=key).first() if not setting: return jsonify({'message': 'Key not found'}), 404 db.session.delete(setting) db.session.commit() return jsonify({'message': 'Key-value pair deleted successfully'}) # Contacts (Users) CRUD @admin_api.route('/contacts', methods=['GET']) @csrf.exempt @token_required def get_contacts(current_user): users = User.query.all() return jsonify([{ 'id': user.id, 'username': user.username, 'email': user.email, 'last_name': user.last_name, 'phone': user.phone, 'company': user.company, 'position': user.position, 'is_active': user.is_active, 'is_admin': user.is_admin, 'is_manager': user.is_manager, 'role': 'admin' if user.is_admin else 'manager' if user.is_manager else 'user', 'created_at': user.created_at.isoformat() } for user in users]) @admin_api.route('/contacts/', methods=['GET']) @csrf.exempt @token_required def get_contact(current_user, user_id): user = User.query.get(user_id) if not user: return jsonify({'message': 'User not found'}), 404 return jsonify({ 'id': user.id, 'username': user.username, 'email': user.email, 'last_name': user.last_name, 'phone': user.phone, 'company': user.company, 'position': user.position, 'is_active': user.is_active, 'is_admin': user.is_admin, 'is_manager': user.is_manager, 'role': 'admin' if user.is_admin else 'manager' if user.is_manager else 'user', 'created_at': user.created_at.isoformat() }) @admin_api.route('/contacts', methods=['POST']) @csrf.exempt @token_required def create_contact(current_user): data = request.get_json() required_fields = ['username', 'email', 'last_name', 'role'] if not all(field in data for field in required_fields): return jsonify({'message': 'Missing required fields'}), 400 if User.query.filter_by(email=data['email']).first(): return jsonify({'message': 'Email already exists'}), 400 # Validate role if data['role'] not in ['admin', 'manager', 'user']: return jsonify({'message': 'Invalid role'}), 400 user = User( username=data['username'], email=data['email'], last_name=data['last_name'], phone=data.get('phone'), company=data.get('company'), position=data.get('position'), is_admin=data['role'] == 'admin', is_manager=data['role'] == 'manager' ) user.set_password(data.get('password', 'changeme')) db.session.add(user) db.session.commit() return jsonify({'message': 'Contact created successfully', 'id': user.id}), 201 @admin_api.route('/contacts/', methods=['PUT']) @csrf.exempt @token_required def update_contact(current_user, user_id): user = User.query.get(user_id) if not user: return jsonify({'message': 'User not found'}), 404 data = request.get_json() if 'email' in data and data['email'] != user.email: if User.query.filter_by(email=data['email']).first(): return jsonify({'message': 'Email already exists'}), 400 user.email = data['email'] # Update role if provided if 'role' in data: if data['role'] not in ['admin', 'manager', 'user']: return jsonify({'message': 'Invalid role'}), 400 user.is_admin = data['role'] == 'admin' user.is_manager = data['role'] == 'manager' user.username = data.get('username', user.username) user.last_name = data.get('last_name', user.last_name) user.phone = data.get('phone', user.phone) user.company = data.get('company', user.company) user.position = data.get('position', user.position) user.is_active = data.get('is_active', user.is_active) if 'password' in data: user.set_password(data['password']) db.session.commit() return jsonify({'message': 'Contact updated successfully'}) @admin_api.route('/contacts/', methods=['DELETE']) @csrf.exempt @token_required def delete_contact(current_user, user_id): user = User.query.get(user_id) if not user: return jsonify({'message': 'User not found'}), 404 db.session.delete(user) db.session.commit() return jsonify({'message': 'Contact deleted successfully'}) # Statistics @admin_api.route('/statistics', methods=['GET']) @csrf.exempt @token_required def get_statistics(current_user): room_count = Room.query.count() conversation_count = Conversation.query.count() # Calculate total storage total_storage = 0 for file in RoomFile.query.all(): if file.size: total_storage += file.size return jsonify({ 'rooms': room_count, 'conversations': conversation_count, 'total_storage': total_storage }) # Website Settings CRUD @admin_api.route('/settings', methods=['GET']) @csrf.exempt @token_required def get_settings(current_user): settings = SiteSettings.get_settings() return jsonify({ 'primary_color': settings.primary_color, 'secondary_color': settings.secondary_color, 'company_name': settings.company_name, 'company_logo': settings.company_logo, 'company_website': settings.company_website, 'company_email': settings.company_email, 'company_phone': settings.company_phone, 'company_address': settings.company_address, 'company_city': settings.company_city, 'company_state': settings.company_state, 'company_zip': settings.company_zip, 'company_country': settings.company_country, 'company_description': settings.company_description, 'company_industry': settings.company_industry }) @admin_api.route('/settings', methods=['PUT']) @csrf.exempt @token_required def update_settings(current_user): settings = SiteSettings.get_settings() data = request.get_json() for key, value in data.items(): if hasattr(settings, key): setattr(settings, key, value) db.session.commit() return jsonify({'message': 'Settings updated successfully'}) # Website Logs @admin_api.route('/logs', methods=['GET']) @csrf.exempt @token_required def get_logs(current_user): page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 50, type=int) events = Event.query.order_by(Event.timestamp.desc()).paginate( page=page, per_page=per_page, error_out=False ) return jsonify({ 'events': [{ 'id': event.id, 'event_type': event.event_type, 'user_id': event.user_id, 'timestamp': event.timestamp.isoformat(), 'details': event.details, 'ip_address': event.ip_address, 'user_agent': event.user_agent } for event in events.items], 'total': events.total, 'pages': events.pages, 'current_page': events.page }) # Mail Logs @admin_api.route('/mail-logs', methods=['GET']) @csrf.exempt @token_required def get_mail_logs(current_user): page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 50, type=int) mails = Mail.query.order_by(Mail.created_at.desc()).paginate( page=page, per_page=per_page, error_out=False ) return jsonify({ 'mails': [{ 'id': mail.id, 'recipient': mail.recipient, 'subject': mail.subject, 'status': mail.status, 'created_at': mail.created_at.isoformat(), 'sent_at': mail.sent_at.isoformat() if mail.sent_at else None, 'template_id': mail.template_id } for mail in mails.items], 'total': mails.total, 'pages': mails.pages, 'current_page': mails.page }) # Resend Setup Mail @admin_api.route('/resend-setup-mail/', methods=['POST']) @csrf.exempt @token_required def resend_setup_mail(current_user, user_id): user = User.query.get(user_id) if not user: return jsonify({'message': 'User not found'}), 404 # Generate a new password setup token token = PasswordSetupToken( user_id=user.id, token=generate_password_hash(str(user.id) + str(datetime.utcnow())), expires_at=datetime.utcnow() + timedelta(days=7) ) db.session.add(token) # Create mail record mail = Mail( recipient=user.email, subject='DocuPulse Account Setup', body=f'Please click the following link to set up your account: {request.host_url}setup-password/{token.token}', status='pending' ) db.session.add(mail) db.session.commit() return jsonify({'message': 'Setup mail queued for resending'}) # Connection Settings @admin_api.route('/test-portainer-connection', methods=['POST']) @csrf.exempt def test_portainer_connection(): data = request.get_json() url = data.get('url') api_key = data.get('api_key') if not url or not api_key: return jsonify({'error': 'Missing required fields'}), 400 try: # Test Portainer connection response = requests.get( f"{url.rstrip('/')}/api/status", headers={ 'X-API-Key': api_key, 'Accept': 'application/json' }, timeout=5 ) if response.status_code == 200: return jsonify({'message': 'Connection successful'}) else: return jsonify({'error': 'Failed to connect to Portainer'}), 400 except Exception as e: return jsonify({'error': str(e)}), 500 @admin_api.route('/test-nginx-connection', methods=['POST']) @csrf.exempt def test_nginx_connection(): data = request.get_json() url = data.get('url') username = data.get('username') password = data.get('password') if not url or not username or not password: return jsonify({'error': 'Missing required fields'}), 400 try: # Test NGINX Proxy Manager connection response = requests.get( f"{url.rstrip('/')}/api/nginx/proxy-hosts", auth=(username, password), headers={'Accept': 'application/json'}, timeout=5 ) if response.status_code == 200: return jsonify({'message': 'Connection successful'}) else: return jsonify({'error': 'Failed to connect to NGINX Proxy Manager'}), 400 except Exception as e: return jsonify({'error': str(e)}), 500 @admin_api.route('/save-portainer-connection', methods=['POST']) @csrf.exempt @token_required def save_portainer_connection(current_user): if not current_user.is_admin: return jsonify({'error': 'Unauthorized'}), 403 data = request.get_json() url = data.get('url') api_key = data.get('api_key') if not url or not api_key: return jsonify({'error': 'Missing required fields'}), 400 try: # Save Portainer settings KeyValueSettings.set_value('portainer_settings', { 'url': url, 'api_key': api_key }) return jsonify({'message': 'Settings saved successfully'}) except Exception as e: return jsonify({'error': str(e)}), 500 @admin_api.route('/save-nginx-connection', methods=['POST']) @csrf.exempt @token_required def save_nginx_connection(current_user): if not current_user.is_admin: return jsonify({'error': 'Unauthorized'}), 403 data = request.get_json() url = data.get('url') username = data.get('username') password = data.get('password') if not url or not username or not password: return jsonify({'error': 'Missing required fields'}), 400 try: # Save NGINX Proxy Manager settings KeyValueSettings.set_value('nginx_settings', { 'url': url, 'username': username, 'password': password }) return jsonify({'message': 'Settings saved successfully'}) except Exception as e: return jsonify({'error': str(e)}), 500 @admin_api.route('/generate-gitea-token', methods=['POST']) @csrf.exempt def generate_gitea_token(): """Generate a new Gitea API token""" data = request.get_json() if not data or 'url' not in data or 'username' not in data or 'password' not in data: return jsonify({'message': 'Missing required fields'}), 400 try: headers = { 'Content-Type': 'application/json' } if data.get('otp'): headers['X-Gitea-OTP'] = data['otp'] # Generate token with required scopes token_data = { 'name': f'docupulse_{datetime.utcnow().strftime("%Y%m%d_%H%M%S")}', 'scopes': [ 'read:activitypub', 'read:issue', 'write:misc', 'read:notification', 'read:organization', 'read:package', 'read:repository', 'read:user' ] } # Make request to Gitea API response = requests.post( f'{data["url"]}/api/v1/users/{data["username"]}/tokens', headers=headers, json=token_data, auth=(data['username'], data['password']) ) if response.status_code == 201: token_data = response.json() return jsonify({ 'token': token_data['sha1'], 'name': token_data['name'], 'token_last_eight': token_data['token_last_eight'] }), 200 else: return jsonify({'message': f'Failed to generate token: {response.json().get("message", "Unknown error")}'}), 400 except Exception as e: return jsonify({'message': f'Failed to generate token: {str(e)}'}), 400 @admin_api.route('/list-gitea-repos', methods=['POST']) @csrf.exempt def list_gitea_repos(): """List repositories from Gitea""" data = request.get_json() if not data or 'url' not in data or 'token' not in data: return jsonify({'message': 'Missing required fields'}), 400 try: # Try different authentication methods headers = { 'Accept': 'application/json' } # First try token in Authorization header headers['Authorization'] = f'token {data["token"]}' # Get user's repositories response = requests.get( f'{data["url"]}/api/v1/user/repos', headers=headers ) # If that fails, try token as query parameter if response.status_code != 200: response = requests.get( f'{data["url"]}/api/v1/user/repos?token={data["token"]}', headers={'Accept': 'application/json'} ) if response.status_code == 200: return jsonify({ 'repositories': response.json() }), 200 else: return jsonify({'message': f'Failed to list repositories: {response.json().get("message", "Unknown error")}'}), 400 except Exception as e: return jsonify({'message': f'Failed to list repositories: {str(e)}'}), 400 @admin_api.route('/list-gitlab-repos', methods=['POST']) @csrf.exempt def list_gitlab_repos(): """List repositories from GitLab""" data = request.get_json() if not data or 'url' not in data or 'token' not in data: return jsonify({'message': 'Missing required fields'}), 400 try: headers = { 'PRIVATE-TOKEN': data['token'], 'Accept': 'application/json' } # Get user's projects (repositories) response = requests.get( f'{data["url"]}/api/v4/projects', headers=headers, params={'membership': 'true'} # Only get projects where user is a member ) if response.status_code == 200: return jsonify({ 'repositories': response.json() }), 200 else: return jsonify({'message': f'Failed to list repositories: {response.json().get("message", "Unknown error")}'}), 400 except Exception as e: return jsonify({'message': f'Failed to list repositories: {str(e)}'}), 400 @admin_api.route('/test-git-connection', methods=['POST']) @csrf.exempt def test_git_connection(): """Test the connection to a Git repository""" data = request.get_json() if not data or 'provider' not in data or 'url' not in data or 'username' not in data or 'token' not in data or 'repo' not in data: return jsonify({'message': 'Missing required fields'}), 400 try: if data['provider'] == 'gitea': # Test Gitea connection with different authentication methods headers = { 'Accept': 'application/json' } # First try token in Authorization header headers['Authorization'] = f'token {data["token"]}' # Try to get repository information response = requests.get( f'{data["url"]}/api/v1/repos/{data["repo"]}', headers=headers ) # If that fails, try token as query parameter if response.status_code != 200: response = requests.get( f'{data["url"]}/api/v1/repos/{data["repo"]}?token={data["token"]}', headers={'Accept': 'application/json'} ) if response.status_code == 200: return jsonify({'message': 'Connection successful'}), 200 else: return jsonify({'message': f'Connection failed: {response.json().get("message", "Unknown error")}'}), 400 elif data['provider'] == 'gitlab': # Test GitLab connection headers = { 'PRIVATE-TOKEN': data['token'], 'Accept': 'application/json' } # Try to get repository information response = requests.get( f'{data["url"]}/api/v4/projects/{data["repo"].replace("/", "%2F")}', headers=headers ) if response.status_code == 200: return jsonify({'message': 'Connection successful'}), 200 else: return jsonify({'message': f'Connection failed: {response.json().get("message", "Unknown error")}'}), 400 else: return jsonify({'message': 'Invalid Git provider'}), 400 except Exception as e: return jsonify({'message': f'Connection failed: {str(e)}'}), 400 @admin_api.route('/save-git-connection', methods=['POST']) @csrf.exempt @token_required def save_git_connection(current_user): if not current_user.is_admin: return jsonify({'error': 'Unauthorized'}), 403 data = request.get_json() provider = data.get('provider') url = data.get('url') username = data.get('username') token = data.get('token') repo = data.get('repo') if not provider or not url or not username or not token or not repo: return jsonify({'error': 'Missing required fields'}), 400 if provider not in ['gitea', 'gitlab']: return jsonify({'error': 'Invalid provider'}), 400 try: # Save Git settings KeyValueSettings.set_value('git_settings', { 'provider': provider, 'url': url, 'username': username, 'token': token, 'repo': repo }) return jsonify({'message': 'Settings saved successfully'}) except Exception as e: return jsonify({'error': str(e)}), 500