Files
docupulse/routes/admin_api.py
2025-06-14 18:18:43 +02:00

1158 lines
39 KiB
Python

from flask import Blueprint, jsonify, request, current_app, make_response, flash, redirect, url_for
from functools import wraps
from models import (
KeyValueSettings, User, Room, Conversation, RoomFile,
SiteSettings, DocuPulseSettings, Event, Mail, ManagementAPIKey
)
from extensions import db, csrf
from datetime import datetime, timedelta
import os
import jwt
from werkzeug.security import generate_password_hash
import secrets
from flask_login import login_user
import requests
import json
admin_api = Blueprint('admin_api', __name__)
def add_cors_headers(response):
"""Add CORS headers to the response"""
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, DELETE, OPTIONS'
response.headers['Access-Control-Allow-Headers'] = 'Content-Type, Authorization, X-API-Key, X-CSRF-Token'
return response
@admin_api.before_request
def handle_preflight():
"""Handle preflight requests"""
if request.method == 'OPTIONS':
response = make_response()
return add_cors_headers(response)
@admin_api.after_request
def after_request(response):
"""Add CORS headers to all responses"""
return add_cors_headers(response)
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = None
if 'Authorization' in request.headers:
token = request.headers['Authorization'].split(" ")[1]
if not token:
return jsonify({'message': 'Token is missing!'}), 401
try:
data = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"])
# Check if it's a management tool token
if data.get('is_management'):
return f(None, *args, **kwargs) # Pass None as current_user for management tool
current_user = User.query.get(data['user_id'])
if not current_user or not current_user.is_admin:
return jsonify({'message': 'Invalid token or insufficient permissions!'}), 401
except:
return jsonify({'message': 'Invalid token!'}), 401
return f(current_user, *args, **kwargs)
return decorated
def generate_management_api_key():
"""Generate a secure API key for the management tool"""
return secrets.token_urlsafe(32)
def validate_management_api_key(api_key):
"""Validate if the provided API key is valid"""
key = ManagementAPIKey.query.filter_by(api_key=api_key, is_active=True).first()
if key:
key.last_used_at = datetime.utcnow()
db.session.commit()
return True
return False
@admin_api.route('/login', methods=['POST'])
@csrf.exempt
def admin_login():
try:
# Check if this is an API request
is_api_request = request.headers.get('Accept') == 'application/json' or \
request.headers.get('Content-Type') == 'application/json'
if is_api_request:
data = request.get_json()
else:
data = request.form
if not data or 'email' not in data or 'password' not in data:
if is_api_request:
return jsonify({
'message': 'Email and password are required',
'status': 'error'
}), 400
flash('Email and password are required', 'error')
return redirect(url_for('auth.login'))
user = User.query.filter_by(email=data['email']).first()
if not user or not user.is_admin or not user.check_password(data['password']):
if is_api_request:
return jsonify({
'message': 'Invalid credentials or not an admin',
'status': 'error'
}), 401
flash('Invalid credentials or not an admin', 'error')
return redirect(url_for('auth.login'))
# For API requests, return JWT token
if is_api_request:
token = jwt.encode({
'user_id': user.id,
'is_admin': True,
'exp': datetime.utcnow() + timedelta(days=1) # Token expires in 1 day
}, current_app.config['SECRET_KEY'], algorithm="HS256")
return jsonify({
'token': token,
'status': 'success'
}), 200
# For web requests, use session-based auth
login_user(user)
return redirect(url_for('main.dashboard'))
except Exception as e:
current_app.logger.error(f"Login error: {str(e)}")
if is_api_request:
return jsonify({
'message': 'An error occurred during login',
'status': 'error'
}), 500
flash('An error occurred during login', 'error')
return redirect(url_for('auth.login'))
@admin_api.route('/management-token', methods=['POST'])
@csrf.exempt
def get_management_token():
"""Generate a JWT token for the management tool using API key authentication"""
api_key = request.headers.get('X-API-Key')
if not api_key or not validate_management_api_key(api_key):
return jsonify({'message': 'Invalid API key'}), 401
# Create a token without expiration
token = jwt.encode({
'user_id': 0, # Special user ID for management tool
'is_management': True
}, current_app.config['SECRET_KEY'], algorithm="HS256")
return jsonify({
'token': token
})
@admin_api.route('/management-api-key', methods=['POST'])
@csrf.exempt
@token_required
def create_management_api_key(current_user):
"""Create a new API key for the management tool (only accessible by admin users)"""
if not current_user.is_admin:
return jsonify({'message': 'Insufficient permissions'}), 403
data = request.get_json()
if not data or 'name' not in data:
return jsonify({'message': 'Name is required'}), 400
api_key = generate_management_api_key()
key = ManagementAPIKey(
api_key=api_key,
name=data['name'],
created_by=current_user.id
)
db.session.add(key)
db.session.commit()
return jsonify({
'api_key': api_key,
'name': key.name,
'created_at': key.created_at.isoformat(),
'message': 'API key generated successfully. Store this key securely as it will not be shown again.'
}), 201
@admin_api.route('/management-api-keys', methods=['GET'])
@csrf.exempt
@token_required
def list_management_api_keys(current_user):
"""List all management API keys (only accessible by admin users)"""
if not current_user.is_admin:
return jsonify({'message': 'Insufficient permissions'}), 403
keys = ManagementAPIKey.query.all()
return jsonify([{
'id': key.id,
'name': key.name,
'created_at': key.created_at.isoformat(),
'last_used_at': key.last_used_at.isoformat() if key.last_used_at else None,
'is_active': key.is_active,
'created_by': key.created_by
} for key in keys])
@admin_api.route('/management-api-key/<int:key_id>', methods=['DELETE'])
@csrf.exempt
@token_required
def revoke_management_api_key(current_user, key_id):
"""Revoke a management API key (only accessible by admin users)"""
if not current_user.is_admin:
return jsonify({'message': 'Insufficient permissions'}), 403
key = ManagementAPIKey.query.get(key_id)
if not key:
return jsonify({'message': 'API key not found'}), 404
key.is_active = False
db.session.commit()
return jsonify({'message': 'API key revoked successfully'})
# Key-Value Settings CRUD
@admin_api.route('/key-value', methods=['GET'])
@csrf.exempt
@token_required
def get_key_values(current_user):
settings = KeyValueSettings.query.all()
return jsonify([{'key': s.key, 'value': s.value} for s in settings])
@admin_api.route('/key-value/<key>', methods=['GET'])
@token_required
def get_key_value(current_user, key):
setting = KeyValueSettings.query.filter_by(key=key).first()
if not setting:
return jsonify({'message': 'Key not found'}), 404
return jsonify({'key': setting.key, 'value': setting.value})
@admin_api.route('/key-value', methods=['POST'])
@csrf.exempt
@token_required
def create_key_value(current_user):
data = request.get_json()
if not data or 'key' not in data or 'value' not in data:
return jsonify({'message': 'Missing key or value'}), 400
setting = KeyValueSettings(key=data['key'], value=data['value'])
db.session.add(setting)
db.session.commit()
return jsonify({'message': 'Key-value pair created successfully'}), 201
@admin_api.route('/key-value/<key>', methods=['PUT'])
@csrf.exempt
@token_required
def update_key_value(current_user, key):
setting = KeyValueSettings.query.filter_by(key=key).first()
if not setting:
return jsonify({'message': 'Key not found'}), 404
data = request.get_json()
if not data or 'value' not in data:
return jsonify({'message': 'Missing value'}), 400
setting.value = data['value']
db.session.commit()
return jsonify({'message': 'Key-value pair updated successfully'})
@admin_api.route('/key-value/<key>', methods=['DELETE'])
@csrf.exempt
@token_required
def delete_key_value(current_user, key):
setting = KeyValueSettings.query.filter_by(key=key).first()
if not setting:
return jsonify({'message': 'Key not found'}), 404
db.session.delete(setting)
db.session.commit()
return jsonify({'message': 'Key-value pair deleted successfully'})
# Contacts (Users) CRUD
@admin_api.route('/contacts', methods=['GET'])
@csrf.exempt
@token_required
def get_contacts(current_user):
users = User.query.all()
return jsonify([{
'id': user.id,
'username': user.username,
'email': user.email,
'last_name': user.last_name,
'phone': user.phone,
'company': user.company,
'position': user.position,
'is_active': user.is_active,
'is_admin': user.is_admin,
'is_manager': user.is_manager,
'role': 'admin' if user.is_admin else 'manager' if user.is_manager else 'user',
'created_at': user.created_at.isoformat()
} for user in users])
@admin_api.route('/contacts/<int:user_id>', methods=['GET'])
@csrf.exempt
@token_required
def get_contact(current_user, user_id):
user = User.query.get(user_id)
if not user:
return jsonify({'message': 'User not found'}), 404
return jsonify({
'id': user.id,
'username': user.username,
'email': user.email,
'last_name': user.last_name,
'phone': user.phone,
'company': user.company,
'position': user.position,
'is_active': user.is_active,
'is_admin': user.is_admin,
'is_manager': user.is_manager,
'role': 'admin' if user.is_admin else 'manager' if user.is_manager else 'user',
'created_at': user.created_at.isoformat()
})
@admin_api.route('/contacts', methods=['POST'])
@csrf.exempt
@token_required
def create_contact(current_user):
data = request.get_json()
required_fields = ['username', 'email', 'last_name', 'role']
if not all(field in data for field in required_fields):
return jsonify({'message': 'Missing required fields'}), 400
if User.query.filter_by(email=data['email']).first():
return jsonify({'message': 'Email already exists'}), 400
# Validate role
if data['role'] not in ['admin', 'manager', 'user']:
return jsonify({'message': 'Invalid role'}), 400
user = User(
username=data['username'],
email=data['email'],
last_name=data['last_name'],
phone=data.get('phone'),
company=data.get('company'),
position=data.get('position'),
is_admin=data['role'] == 'admin',
is_manager=data['role'] == 'manager'
)
user.set_password(data.get('password', 'changeme'))
db.session.add(user)
db.session.commit()
return jsonify({'message': 'Contact created successfully', 'id': user.id}), 201
@admin_api.route('/contacts/<int:user_id>', methods=['PUT'])
@csrf.exempt
@token_required
def update_contact(current_user, user_id):
user = User.query.get(user_id)
if not user:
return jsonify({'message': 'User not found'}), 404
data = request.get_json()
if 'email' in data and data['email'] != user.email:
if User.query.filter_by(email=data['email']).first():
return jsonify({'message': 'Email already exists'}), 400
user.email = data['email']
# Update role if provided
if 'role' in data:
if data['role'] not in ['admin', 'manager', 'user']:
return jsonify({'message': 'Invalid role'}), 400
user.is_admin = data['role'] == 'admin'
user.is_manager = data['role'] == 'manager'
user.username = data.get('username', user.username)
user.last_name = data.get('last_name', user.last_name)
user.phone = data.get('phone', user.phone)
user.company = data.get('company', user.company)
user.position = data.get('position', user.position)
user.is_active = data.get('is_active', user.is_active)
if 'password' in data:
user.set_password(data['password'])
db.session.commit()
return jsonify({'message': 'Contact updated successfully'})
@admin_api.route('/contacts/<int:user_id>', methods=['DELETE'])
@csrf.exempt
@token_required
def delete_contact(current_user, user_id):
user = User.query.get(user_id)
if not user:
return jsonify({'message': 'User not found'}), 404
db.session.delete(user)
db.session.commit()
return jsonify({'message': 'Contact deleted successfully'})
# Statistics
@admin_api.route('/statistics', methods=['GET'])
@csrf.exempt
@token_required
def get_statistics(current_user):
room_count = Room.query.count()
conversation_count = Conversation.query.count()
# Calculate total storage
total_storage = 0
for file in RoomFile.query.all():
if file.size:
total_storage += file.size
return jsonify({
'rooms': room_count,
'conversations': conversation_count,
'total_storage': total_storage
})
# Website Settings CRUD
@admin_api.route('/settings', methods=['GET'])
@csrf.exempt
@token_required
def get_settings(current_user):
settings = SiteSettings.get_settings()
return jsonify({
'primary_color': settings.primary_color,
'secondary_color': settings.secondary_color,
'company_name': settings.company_name,
'company_logo': settings.company_logo,
'company_website': settings.company_website,
'company_email': settings.company_email,
'company_phone': settings.company_phone,
'company_address': settings.company_address,
'company_city': settings.company_city,
'company_state': settings.company_state,
'company_zip': settings.company_zip,
'company_country': settings.company_country,
'company_description': settings.company_description,
'company_industry': settings.company_industry
})
@admin_api.route('/settings', methods=['PUT'])
@csrf.exempt
@token_required
def update_settings(current_user):
settings = SiteSettings.get_settings()
data = request.get_json()
for key, value in data.items():
if hasattr(settings, key):
setattr(settings, key, value)
db.session.commit()
return jsonify({'message': 'Settings updated successfully'})
# Website Logs
@admin_api.route('/logs', methods=['GET'])
@csrf.exempt
@token_required
def get_logs(current_user):
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 50, type=int)
events = Event.query.order_by(Event.timestamp.desc()).paginate(
page=page, per_page=per_page, error_out=False
)
return jsonify({
'events': [{
'id': event.id,
'event_type': event.event_type,
'user_id': event.user_id,
'timestamp': event.timestamp.isoformat(),
'details': event.details,
'ip_address': event.ip_address,
'user_agent': event.user_agent
} for event in events.items],
'total': events.total,
'pages': events.pages,
'current_page': events.page
})
# Mail Logs
@admin_api.route('/mail-logs', methods=['GET'])
@csrf.exempt
@token_required
def get_mail_logs(current_user):
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 50, type=int)
mails = Mail.query.order_by(Mail.created_at.desc()).paginate(
page=page, per_page=per_page, error_out=False
)
return jsonify({
'mails': [{
'id': mail.id,
'recipient': mail.recipient,
'subject': mail.subject,
'status': mail.status,
'created_at': mail.created_at.isoformat(),
'sent_at': mail.sent_at.isoformat() if mail.sent_at else None,
'template_id': mail.template_id
} for mail in mails.items],
'total': mails.total,
'pages': mails.pages,
'current_page': mails.page
})
# Resend Setup Mail
@admin_api.route('/resend-setup-mail/<int:user_id>', methods=['POST'])
@csrf.exempt
@token_required
def resend_setup_mail(current_user, user_id):
user = User.query.get(user_id)
if not user:
return jsonify({'message': 'User not found'}), 404
# Generate a new password setup token
token = PasswordSetupToken(
user_id=user.id,
token=generate_password_hash(str(user.id) + str(datetime.utcnow())),
expires_at=datetime.utcnow() + timedelta(days=7)
)
db.session.add(token)
# Create mail record
mail = Mail(
recipient=user.email,
subject='DocuPulse Account Setup',
body=f'Please click the following link to set up your account: {request.host_url}setup-password/{token.token}',
status='pending'
)
db.session.add(mail)
db.session.commit()
return jsonify({'message': 'Setup mail queued for resending'})
# Connection Settings
@admin_api.route('/test-portainer-connection', methods=['POST'])
@csrf.exempt
def test_portainer_connection():
data = request.get_json()
url = data.get('url')
api_key = data.get('api_key')
if not url or not api_key:
return jsonify({'error': 'Missing required fields'}), 400
try:
# Test Portainer connection
response = requests.get(
f"{url.rstrip('/')}/api/status",
headers={
'X-API-Key': api_key,
'Accept': 'application/json'
},
timeout=5
)
if response.status_code == 200:
return jsonify({'message': 'Connection successful'})
else:
return jsonify({'error': 'Failed to connect to Portainer'}), 400
except Exception as e:
return jsonify({'error': str(e)}), 500
@admin_api.route('/test-nginx-connection', methods=['POST'])
@csrf.exempt
def test_nginx_connection():
data = request.get_json()
url = data.get('url')
username = data.get('username')
password = data.get('password')
if not url or not username or not password:
return jsonify({'error': 'Missing required fields'}), 400
try:
# First, get the JWT token
token_response = requests.post(
f"{url.rstrip('/')}/api/tokens",
json={
'identity': username,
'secret': password
},
headers={'Content-Type': 'application/json'},
timeout=5
)
if token_response.status_code != 200:
return jsonify({'error': 'Failed to authenticate with NGINX Proxy Manager'}), 400
token_data = token_response.json()
token = token_data.get('token')
if not token:
return jsonify({'error': 'No token received from NGINX Proxy Manager'}), 400
# Now test the connection using the token
response = requests.get(
f"{url.rstrip('/')}/api/nginx/proxy-hosts",
headers={
'Authorization': f'Bearer {token}',
'Accept': 'application/json'
},
timeout=5
)
if response.status_code == 200:
return jsonify({'message': 'Connection successful'})
else:
return jsonify({'error': 'Failed to connect to NGINX Proxy Manager'}), 400
except Exception as e:
return jsonify({'error': str(e)}), 500
@admin_api.route('/save-portainer-connection', methods=['POST'])
@csrf.exempt
@token_required
def save_portainer_connection(current_user):
if not current_user.is_admin:
return jsonify({'error': 'Unauthorized'}), 403
data = request.get_json()
url = data.get('url')
api_key = data.get('api_key')
if not url or not api_key:
return jsonify({'error': 'Missing required fields'}), 400
try:
# Save Portainer settings
KeyValueSettings.set_value('portainer_settings', {
'url': url,
'api_key': api_key
})
return jsonify({'message': 'Settings saved successfully'})
except Exception as e:
return jsonify({'error': str(e)}), 500
@admin_api.route('/save-nginx-connection', methods=['POST'])
@csrf.exempt
@token_required
def save_nginx_connection(current_user):
if not current_user.is_admin:
return jsonify({'error': 'Unauthorized'}), 403
data = request.get_json()
url = data.get('url')
username = data.get('username')
password = data.get('password')
if not url or not username or not password:
return jsonify({'error': 'Missing required fields'}), 400
try:
# Save NGINX Proxy Manager settings
KeyValueSettings.set_value('nginx_settings', {
'url': url,
'username': username,
'password': password
})
return jsonify({'message': 'Settings saved successfully'})
except Exception as e:
return jsonify({'error': str(e)}), 500
@admin_api.route('/generate-gitea-token', methods=['POST'])
@csrf.exempt
def generate_gitea_token():
"""Generate a new Gitea API token"""
data = request.get_json()
if not data or 'url' not in data or 'username' not in data or 'password' not in data:
return jsonify({'message': 'Missing required fields'}), 400
try:
headers = {
'Content-Type': 'application/json'
}
if data.get('otp'):
headers['X-Gitea-OTP'] = data['otp']
# Generate token with required scopes
token_data = {
'name': f'docupulse_{datetime.utcnow().strftime("%Y%m%d_%H%M%S")}',
'scopes': [
'read:activitypub',
'read:issue',
'write:misc',
'read:notification',
'read:organization',
'read:package',
'read:repository',
'read:user'
]
}
# Make request to Gitea API
response = requests.post(
f'{data["url"]}/api/v1/users/{data["username"]}/tokens',
headers=headers,
json=token_data,
auth=(data['username'], data['password'])
)
if response.status_code == 201:
token_data = response.json()
return jsonify({
'token': token_data['sha1'],
'name': token_data['name'],
'token_last_eight': token_data['token_last_eight']
}), 200
else:
return jsonify({'message': f'Failed to generate token: {response.json().get("message", "Unknown error")}'}), 400
except Exception as e:
return jsonify({'message': f'Failed to generate token: {str(e)}'}), 400
@admin_api.route('/list-gitea-repos', methods=['POST'])
@csrf.exempt
def list_gitea_repos():
"""List repositories from Gitea"""
data = request.get_json()
if not data or 'url' not in data or 'token' not in data:
return jsonify({'message': 'Missing required fields'}), 400
try:
# Try different authentication methods
headers = {
'Accept': 'application/json'
}
# First try token in Authorization header
headers['Authorization'] = f'token {data["token"]}'
# Get user's repositories
response = requests.get(
f'{data["url"]}/api/v1/user/repos',
headers=headers
)
# If that fails, try token as query parameter
if response.status_code != 200:
response = requests.get(
f'{data["url"]}/api/v1/user/repos?token={data["token"]}',
headers={'Accept': 'application/json'}
)
if response.status_code == 200:
return jsonify({
'repositories': response.json()
}), 200
else:
return jsonify({'message': f'Failed to list repositories: {response.json().get("message", "Unknown error")}'}), 400
except Exception as e:
return jsonify({'message': f'Failed to list repositories: {str(e)}'}), 400
@admin_api.route('/list-gitea-branches', methods=['POST'])
@csrf.exempt
def list_gitea_branches():
"""List branches from a Gitea repository"""
data = request.get_json()
if not data or 'url' not in data or 'token' not in data or 'repo' not in data:
return jsonify({'message': 'Missing required fields'}), 400
try:
# Try different authentication methods
headers = {
'Accept': 'application/json'
}
# First try token in Authorization header
headers['Authorization'] = f'token {data["token"]}'
# Get repository branches
response = requests.get(
f'{data["url"]}/api/v1/repos/{data["repo"]}/branches',
headers=headers
)
# If that fails, try token as query parameter
if response.status_code != 200:
response = requests.get(
f'{data["url"]}/api/v1/repos/{data["repo"]}/branches?token={data["token"]}',
headers={'Accept': 'application/json'}
)
if response.status_code == 200:
return jsonify({
'branches': response.json()
}), 200
else:
return jsonify({'message': f'Failed to list branches: {response.json().get("message", "Unknown error")}'}), 400
except Exception as e:
return jsonify({'message': f'Failed to list branches: {str(e)}'}), 400
@admin_api.route('/list-gitlab-repos', methods=['POST'])
@csrf.exempt
def list_gitlab_repos():
"""List repositories from GitLab"""
data = request.get_json()
if not data or 'url' not in data or 'token' not in data:
return jsonify({'message': 'Missing required fields'}), 400
try:
headers = {
'PRIVATE-TOKEN': data['token'],
'Accept': 'application/json'
}
# Get user's projects (repositories)
response = requests.get(
f'{data["url"]}/api/v4/projects',
headers=headers,
params={'membership': 'true'} # Only get projects where user is a member
)
if response.status_code == 200:
return jsonify({
'repositories': response.json()
}), 200
else:
return jsonify({'message': f'Failed to list repositories: {response.json().get("message", "Unknown error")}'}), 400
except Exception as e:
return jsonify({'message': f'Failed to list repositories: {str(e)}'}), 400
@admin_api.route('/test-git-connection', methods=['POST'])
@csrf.exempt
def test_git_connection():
"""Test the connection to a Git repository"""
data = request.get_json()
if not data or 'provider' not in data or 'url' not in data or 'username' not in data or 'token' not in data or 'repo' not in data:
return jsonify({'message': 'Missing required fields'}), 400
try:
if data['provider'] == 'gitea':
# Test Gitea connection with different authentication methods
headers = {
'Accept': 'application/json'
}
# First try token in Authorization header
headers['Authorization'] = f'token {data["token"]}'
# Try to get repository information
response = requests.get(
f'{data["url"]}/api/v1/repos/{data["repo"]}',
headers=headers
)
# If that fails, try token as query parameter
if response.status_code != 200:
response = requests.get(
f'{data["url"]}/api/v1/repos/{data["repo"]}?token={data["token"]}',
headers={'Accept': 'application/json'}
)
if response.status_code == 200:
return jsonify({'message': 'Connection successful'}), 200
else:
return jsonify({'message': f'Connection failed: {response.json().get("message", "Unknown error")}'}), 400
elif data['provider'] == 'gitlab':
# Test GitLab connection
headers = {
'PRIVATE-TOKEN': data['token'],
'Accept': 'application/json'
}
# Try to get repository information
response = requests.get(
f'{data["url"]}/api/v4/projects/{data["repo"].replace("/", "%2F")}',
headers=headers
)
if response.status_code == 200:
return jsonify({'message': 'Connection successful'}), 200
else:
return jsonify({'message': f'Connection failed: {response.json().get("message", "Unknown error")}'}), 400
else:
return jsonify({'message': 'Invalid Git provider'}), 400
except Exception as e:
return jsonify({'message': f'Connection failed: {str(e)}'}), 400
@admin_api.route('/save-git-connection', methods=['POST'])
@csrf.exempt
@token_required
def save_git_connection(current_user):
if not current_user.is_admin:
return jsonify({'error': 'Unauthorized'}), 403
data = request.get_json()
provider = data.get('provider')
url = data.get('url')
username = data.get('username')
token = data.get('token')
repo = data.get('repo')
if not provider or not url or not username or not token or not repo:
return jsonify({'error': 'Missing required fields'}), 400
if provider not in ['gitea', 'gitlab']:
return jsonify({'error': 'Invalid provider'}), 400
try:
# Save Git settings
KeyValueSettings.set_value('git_settings', {
'provider': provider,
'url': url,
'username': username,
'token': token,
'repo': repo
})
return jsonify({'message': 'Settings saved successfully'})
except Exception as e:
return jsonify({'error': str(e)}), 500
@admin_api.route('/create-proxy-host', methods=['POST'])
@csrf.exempt
@token_required
def create_proxy_host(current_user):
if not current_user.is_admin:
return jsonify({'error': 'Unauthorized'}), 403
data = request.get_json()
domains = data.get('domains')
scheme = data.get('scheme', 'http')
forward_ip = data.get('forward_ip')
forward_port = data.get('forward_port')
if not domains or not forward_ip or not forward_port:
return jsonify({'error': 'Missing required fields'}), 400
try:
# Get NGINX settings
nginx_settings = KeyValueSettings.get_value('nginx_settings')
if not nginx_settings:
return jsonify({'error': 'NGINX settings not configured'}), 400
# First, get the JWT token
token_response = requests.post(
f"{nginx_settings['url'].rstrip('/')}/api/tokens",
json={
'identity': nginx_settings['username'],
'secret': nginx_settings['password']
},
headers={'Content-Type': 'application/json'},
timeout=5
)
if token_response.status_code != 200:
return jsonify({'error': 'Failed to authenticate with NGINX Proxy Manager'}), 400
token_data = token_response.json()
token = token_data.get('token')
if not token:
return jsonify({'error': 'No token received from NGINX Proxy Manager'}), 400
# Create the proxy host
proxy_host_data = {
'domain_names': domains,
'forward_scheme': scheme,
'forward_host': forward_ip,
'forward_port': int(forward_port),
'ssl_forced': True,
'caching_enabled': True,
'block_exploits': True,
'allow_websocket_upgrade': True,
'http2_support': True,
'hsts_enabled': True,
'hsts_subdomains': True,
'meta': {
'letsencrypt_agree': True,
'dns_challenge': False
}
}
response = requests.post(
f"{nginx_settings['url'].rstrip('/')}/api/nginx/proxy-hosts",
json=proxy_host_data,
headers={
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json',
'Accept': 'application/json'
},
timeout=5
)
if response.status_code == 200:
return jsonify({
'message': 'Proxy host created successfully',
'data': response.json()
})
else:
error_data = response.json()
return jsonify({
'error': f'Failed to create proxy host: {error_data.get("message", "Unknown error")}'
}), 400
except Exception as e:
return jsonify({'error': str(e)}), 500
@admin_api.route('/create-ssl-certificate', methods=['POST'])
@csrf.exempt
@token_required
def create_ssl_certificate(current_user):
try:
data = request.get_json()
current_app.logger.info(f"Received request data: {data}")
domains = data.get('domains')
proxy_host_id = data.get('proxy_host_id')
nginx_url = data.get('nginx_url')
current_app.logger.info(f"Extracted data - domains: {domains}, proxy_host_id: {proxy_host_id}, nginx_url: {nginx_url}")
if not all([domains, proxy_host_id, nginx_url]):
missing_fields = []
if not domains: missing_fields.append('domains')
if not proxy_host_id: missing_fields.append('proxy_host_id')
if not nginx_url: missing_fields.append('nginx_url')
current_app.logger.error(f"Missing required fields: {missing_fields}")
return jsonify({
'success': False,
'error': f'Missing required fields: {", ".join(missing_fields)}'
}), 400
# Get NGINX settings
nginx_settings = KeyValueSettings.get_value('nginx_settings')
if not nginx_settings:
return jsonify({
'success': False,
'error': 'NGINX settings not configured'
}), 400
# First, get the JWT token
token_response = requests.post(
f"{nginx_settings['url'].rstrip('/')}/api/tokens",
json={
'identity': nginx_settings['username'],
'secret': nginx_settings['password']
},
headers={'Content-Type': 'application/json'},
timeout=5
)
if token_response.status_code != 200:
return jsonify({
'success': False,
'error': 'Failed to authenticate with NGINX Proxy Manager'
}), 400
token_data = token_response.json()
token = token_data.get('token')
if not token:
return jsonify({
'success': False,
'error': 'No token received from NGINX Proxy Manager'
}), 400
# Create the SSL certificate
ssl_request_data = {
'provider': 'letsencrypt',
'domain_names': domains,
'meta': {
'letsencrypt_agree': True,
'dns_challenge': False
}
}
current_app.logger.info(f"Making SSL certificate request to {nginx_url}/api/nginx/ssl with data: {ssl_request_data}")
ssl_response = requests.post(
f"{nginx_url}/api/nginx/ssl",
headers={
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json',
'Accept': 'application/json'
},
json=ssl_request_data
)
current_app.logger.info(f"SSL certificate response status: {ssl_response.status_code}")
current_app.logger.info(f"SSL certificate response headers: {dict(ssl_response.headers)}")
if not ssl_response.ok:
error_text = ssl_response.text
current_app.logger.error(f"Failed to create SSL certificate: {error_text}")
return jsonify({
'success': False,
'error': f'Failed to create SSL certificate: {error_text}'
}), ssl_response.status_code
ssl_data = ssl_response.json()
current_app.logger.info(f"SSL certificate created successfully: {ssl_data}")
# Get the certificate ID
cert_id = ssl_data.get('id')
if not cert_id:
current_app.logger.error("No certificate ID received in response")
return jsonify({
'success': False,
'error': 'No certificate ID received'
}), 500
# Update the proxy host with the certificate
update_request_data = {
'ssl_certificate_id': cert_id
}
current_app.logger.info(f"Updating proxy host {proxy_host_id} with data: {update_request_data}")
update_response = requests.put(
f"{nginx_url}/api/nginx/proxy-hosts/{proxy_host_id}",
headers={
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json',
'Accept': 'application/json'
},
json=update_request_data
)
current_app.logger.info(f"Update response status: {update_response.status_code}")
current_app.logger.info(f"Update response headers: {dict(update_response.headers)}")
if not update_response.ok:
error_text = update_response.text
current_app.logger.error(f"Failed to update proxy host: {error_text}")
return jsonify({
'success': False,
'error': f'Failed to update proxy host: {error_text}'
}), update_response.status_code
update_data = update_response.json()
current_app.logger.info(f"Proxy host updated successfully: {update_data}")
return jsonify({
'success': True,
'data': {
'certificate': ssl_data,
'proxy_host': update_data
}
})
except Exception as e:
current_app.logger.error(f"Error in create_ssl_certificate: {str(e)}")
return jsonify({
'success': False,
'error': str(e)
}), 500