Split files

This commit is contained in:
2025-06-16 08:42:02 +02:00
parent 15f69f533a
commit 64569c3505
7 changed files with 1759 additions and 1714 deletions

View File

@@ -11,9 +11,6 @@ import jwt
from werkzeug.security import generate_password_hash
import secrets
from flask_login import login_user
import requests
import json
import base64
admin_api = Blueprint('admin_api', __name__)
@@ -529,705 +526,4 @@ def resend_setup_mail(current_user, user_id):
db.session.add(mail)
db.session.commit()
return jsonify({'message': 'Setup mail queued for resending'})
# Connection Settings
@admin_api.route('/test-portainer-connection', methods=['POST'])
@csrf.exempt
def test_portainer_connection():
data = request.get_json()
url = data.get('url')
api_key = data.get('api_key')
if not url or not api_key:
return jsonify({'error': 'Missing required fields'}), 400
try:
# Test Portainer connection
response = requests.get(
f"{url.rstrip('/')}/api/status",
headers={
'X-API-Key': api_key,
'Accept': 'application/json'
},
timeout=5
)
if response.status_code == 200:
return jsonify({'message': 'Connection successful'})
else:
return jsonify({'error': 'Failed to connect to Portainer'}), 400
except Exception as e:
return jsonify({'error': str(e)}), 500
@admin_api.route('/test-nginx-connection', methods=['POST'])
@csrf.exempt
def test_nginx_connection():
data = request.get_json()
url = data.get('url')
username = data.get('username')
password = data.get('password')
if not url or not username or not password:
return jsonify({'error': 'Missing required fields'}), 400
try:
# First, get the JWT token
token_response = requests.post(
f"{url.rstrip('/')}/api/tokens",
json={
'identity': username,
'secret': password
},
headers={'Content-Type': 'application/json'},
timeout=5
)
if token_response.status_code != 200:
return jsonify({'error': 'Failed to authenticate with NGINX Proxy Manager'}), 400
token_data = token_response.json()
token = token_data.get('token')
if not token:
return jsonify({'error': 'No token received from NGINX Proxy Manager'}), 400
# Now test the connection using the token
response = requests.get(
f"{url.rstrip('/')}/api/nginx/proxy-hosts",
headers={
'Authorization': f'Bearer {token}',
'Accept': 'application/json'
},
timeout=5
)
if response.status_code == 200:
return jsonify({'message': 'Connection successful'})
else:
return jsonify({'error': 'Failed to connect to NGINX Proxy Manager'}), 400
except Exception as e:
return jsonify({'error': str(e)}), 500
@admin_api.route('/save-portainer-connection', methods=['POST'])
@csrf.exempt
@token_required
def save_portainer_connection(current_user):
if not current_user.is_admin:
return jsonify({'error': 'Unauthorized'}), 403
data = request.get_json()
url = data.get('url')
api_key = data.get('api_key')
if not url or not api_key:
return jsonify({'error': 'Missing required fields'}), 400
try:
# Save Portainer settings
KeyValueSettings.set_value('portainer_settings', {
'url': url,
'api_key': api_key
})
return jsonify({'message': 'Settings saved successfully'})
except Exception as e:
return jsonify({'error': str(e)}), 500
@admin_api.route('/save-nginx-connection', methods=['POST'])
@csrf.exempt
@token_required
def save_nginx_connection(current_user):
if not current_user.is_admin:
return jsonify({'error': 'Unauthorized'}), 403
data = request.get_json()
url = data.get('url')
username = data.get('username')
password = data.get('password')
if not url or not username or not password:
return jsonify({'error': 'Missing required fields'}), 400
try:
# Save NGINX Proxy Manager settings
KeyValueSettings.set_value('nginx_settings', {
'url': url,
'username': username,
'password': password
})
return jsonify({'message': 'Settings saved successfully'})
except Exception as e:
return jsonify({'error': str(e)}), 500
@admin_api.route('/generate-gitea-token', methods=['POST'])
@csrf.exempt
def generate_gitea_token():
"""Generate a new Gitea API token"""
data = request.get_json()
if not data or 'url' not in data or 'username' not in data or 'password' not in data:
return jsonify({'message': 'Missing required fields'}), 400
try:
headers = {
'Content-Type': 'application/json'
}
if data.get('otp'):
headers['X-Gitea-OTP'] = data['otp']
# Generate token with required scopes
token_data = {
'name': f'docupulse_{datetime.utcnow().strftime("%Y%m%d_%H%M%S")}',
'scopes': [
'read:activitypub',
'read:issue',
'write:misc',
'read:notification',
'read:organization',
'read:package',
'read:repository',
'read:user'
]
}
# Make request to Gitea API
response = requests.post(
f'{data["url"]}/api/v1/users/{data["username"]}/tokens',
headers=headers,
json=token_data,
auth=(data['username'], data['password'])
)
if response.status_code == 201:
token_data = response.json()
return jsonify({
'token': token_data['sha1'],
'name': token_data['name'],
'token_last_eight': token_data['token_last_eight']
}), 200
else:
return jsonify({'message': f'Failed to generate token: {response.json().get("message", "Unknown error")}'}), 400
except Exception as e:
return jsonify({'message': f'Failed to generate token: {str(e)}'}), 400
@admin_api.route('/list-gitea-repos', methods=['POST'])
@csrf.exempt
def list_gitea_repos():
"""List repositories from Gitea"""
data = request.get_json()
if not data or 'url' not in data or 'token' not in data:
return jsonify({'message': 'Missing required fields'}), 400
try:
# Try different authentication methods
headers = {
'Accept': 'application/json'
}
# First try token in Authorization header
headers['Authorization'] = f'token {data["token"]}'
# Get user's repositories
response = requests.get(
f'{data["url"]}/api/v1/user/repos',
headers=headers
)
# If that fails, try token as query parameter
if response.status_code != 200:
response = requests.get(
f'{data["url"]}/api/v1/user/repos?token={data["token"]}',
headers={'Accept': 'application/json'}
)
if response.status_code == 200:
return jsonify({
'repositories': response.json()
}), 200
else:
return jsonify({'message': f'Failed to list repositories: {response.json().get("message", "Unknown error")}'}), 400
except Exception as e:
return jsonify({'message': f'Failed to list repositories: {str(e)}'}), 400
@admin_api.route('/list-gitea-branches', methods=['POST'])
@csrf.exempt
def list_gitea_branches():
"""List branches from a Gitea repository"""
data = request.get_json()
if not data or 'url' not in data or 'token' not in data or 'repo' not in data:
return jsonify({'message': 'Missing required fields'}), 400
try:
# Try different authentication methods
headers = {
'Accept': 'application/json'
}
# First try token in Authorization header
headers['Authorization'] = f'token {data["token"]}'
# Get repository branches
response = requests.get(
f'{data["url"]}/api/v1/repos/{data["repo"]}/branches',
headers=headers
)
# If that fails, try token as query parameter
if response.status_code != 200:
response = requests.get(
f'{data["url"]}/api/v1/repos/{data["repo"]}/branches?token={data["token"]}',
headers={'Accept': 'application/json'}
)
if response.status_code == 200:
return jsonify({
'branches': response.json()
}), 200
else:
return jsonify({'message': f'Failed to list branches: {response.json().get("message", "Unknown error")}'}), 400
except Exception as e:
return jsonify({'message': f'Failed to list branches: {str(e)}'}), 400
@admin_api.route('/list-gitlab-repos', methods=['POST'])
@csrf.exempt
def list_gitlab_repos():
"""List repositories from GitLab"""
data = request.get_json()
if not data or 'url' not in data or 'token' not in data:
return jsonify({'message': 'Missing required fields'}), 400
try:
headers = {
'PRIVATE-TOKEN': data['token'],
'Accept': 'application/json'
}
# Get user's projects (repositories)
response = requests.get(
f'{data["url"]}/api/v4/projects',
headers=headers,
params={'membership': 'true'} # Only get projects where user is a member
)
if response.status_code == 200:
return jsonify({
'repositories': response.json()
}), 200
else:
return jsonify({'message': f'Failed to list repositories: {response.json().get("message", "Unknown error")}'}), 400
except Exception as e:
return jsonify({'message': f'Failed to list repositories: {str(e)}'}), 400
@admin_api.route('/test-git-connection', methods=['POST'])
@csrf.exempt
def test_git_connection():
"""Test the connection to a Git repository"""
data = request.get_json()
if not data or 'provider' not in data or 'url' not in data or 'username' not in data or 'token' not in data or 'repo' not in data:
return jsonify({'message': 'Missing required fields'}), 400
try:
if data['provider'] == 'gitea':
# Test Gitea connection with different authentication methods
headers = {
'Accept': 'application/json'
}
# First try token in Authorization header
headers['Authorization'] = f'token {data["token"]}'
# Try to get repository information
response = requests.get(
f'{data["url"]}/api/v1/repos/{data["repo"]}',
headers=headers
)
# If that fails, try token as query parameter
if response.status_code != 200:
response = requests.get(
f'{data["url"]}/api/v1/repos/{data["repo"]}?token={data["token"]}',
headers={'Accept': 'application/json'}
)
if response.status_code == 200:
return jsonify({'message': 'Connection successful'}), 200
else:
return jsonify({'message': f'Connection failed: {response.json().get("message", "Unknown error")}'}), 400
elif data['provider'] == 'gitlab':
# Test GitLab connection
headers = {
'PRIVATE-TOKEN': data['token'],
'Accept': 'application/json'
}
# Try to get repository information
response = requests.get(
f'{data["url"]}/api/v4/projects/{data["repo"].replace("/", "%2F")}',
headers=headers
)
if response.status_code == 200:
return jsonify({'message': 'Connection successful'}), 200
else:
return jsonify({'message': f'Connection failed: {response.json().get("message", "Unknown error")}'}), 400
else:
return jsonify({'message': 'Invalid Git provider'}), 400
except Exception as e:
return jsonify({'message': f'Connection failed: {str(e)}'}), 400
@admin_api.route('/save-git-connection', methods=['POST'])
@csrf.exempt
@token_required
def save_git_connection(current_user):
if not current_user.is_admin:
return jsonify({'error': 'Unauthorized'}), 403
data = request.get_json()
provider = data.get('provider')
url = data.get('url')
username = data.get('username')
token = data.get('token')
repo = data.get('repo')
if not provider or not url or not username or not token or not repo:
return jsonify({'error': 'Missing required fields'}), 400
if provider not in ['gitea', 'gitlab']:
return jsonify({'error': 'Invalid provider'}), 400
try:
# Save Git settings
KeyValueSettings.set_value('git_settings', {
'provider': provider,
'url': url,
'username': username,
'token': token,
'repo': repo
})
return jsonify({'message': 'Settings saved successfully'})
except Exception as e:
return jsonify({'error': str(e)}), 500
@admin_api.route('/create-proxy-host', methods=['POST'])
@csrf.exempt
@token_required
def create_proxy_host(current_user):
if not current_user.is_admin:
return jsonify({'error': 'Unauthorized'}), 403
data = request.get_json()
domains = data.get('domains')
scheme = data.get('scheme', 'http')
forward_ip = data.get('forward_ip')
forward_port = data.get('forward_port')
if not domains or not forward_ip or not forward_port:
return jsonify({'error': 'Missing required fields'}), 400
try:
# Get NGINX settings
nginx_settings = KeyValueSettings.get_value('nginx_settings')
if not nginx_settings:
return jsonify({'error': 'NGINX settings not configured'}), 400
# First, get the JWT token
token_response = requests.post(
f"{nginx_settings['url'].rstrip('/')}/api/tokens",
json={
'identity': nginx_settings['username'],
'secret': nginx_settings['password']
},
headers={'Content-Type': 'application/json'},
timeout=5
)
if token_response.status_code != 200:
return jsonify({'error': 'Failed to authenticate with NGINX Proxy Manager'}), 400
token_data = token_response.json()
token = token_data.get('token')
if not token:
return jsonify({'error': 'No token received from NGINX Proxy Manager'}), 400
# Create the proxy host
proxy_host_data = {
'domain_names': domains,
'forward_scheme': scheme,
'forward_host': forward_ip,
'forward_port': int(forward_port),
'ssl_forced': True,
'caching_enabled': True,
'block_exploits': True,
'allow_websocket_upgrade': True,
'http2_support': True,
'hsts_enabled': True,
'hsts_subdomains': True,
'meta': {
'letsencrypt_agree': True,
'dns_challenge': False
}
}
response = requests.post(
f"{nginx_settings['url'].rstrip('/')}/api/nginx/proxy-hosts",
json=proxy_host_data,
headers={
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json',
'Accept': 'application/json'
},
timeout=5
)
if response.status_code == 200:
return jsonify({
'message': 'Proxy host created successfully',
'data': response.json()
})
else:
error_data = response.json()
return jsonify({
'error': f'Failed to create proxy host: {error_data.get("message", "Unknown error")}'
}), 400
except Exception as e:
return jsonify({'error': str(e)}), 500
@admin_api.route('/create-ssl-certificate', methods=['POST'])
@csrf.exempt
@token_required
def create_ssl_certificate(current_user):
try:
data = request.get_json()
current_app.logger.info(f"Received request data: {data}")
domains = data.get('domains')
proxy_host_id = data.get('proxy_host_id')
nginx_url = data.get('nginx_url')
current_app.logger.info(f"Extracted data - domains: {domains}, proxy_host_id: {proxy_host_id}, nginx_url: {nginx_url}")
if not all([domains, proxy_host_id, nginx_url]):
missing_fields = []
if not domains: missing_fields.append('domains')
if not proxy_host_id: missing_fields.append('proxy_host_id')
if not nginx_url: missing_fields.append('nginx_url')
current_app.logger.error(f"Missing required fields: {missing_fields}")
return jsonify({
'success': False,
'error': f'Missing required fields: {", ".join(missing_fields)}'
}), 400
# Get NGINX settings
nginx_settings = KeyValueSettings.get_value('nginx_settings')
if not nginx_settings:
return jsonify({
'success': False,
'error': 'NGINX settings not configured'
}), 400
# First, get the JWT token
token_response = requests.post(
f"{nginx_settings['url'].rstrip('/')}/api/tokens",
json={
'identity': nginx_settings['username'],
'secret': nginx_settings['password']
},
headers={'Content-Type': 'application/json'},
timeout=5
)
if token_response.status_code != 200:
return jsonify({
'success': False,
'error': 'Failed to authenticate with NGINX Proxy Manager'
}), 400
token_data = token_response.json()
token = token_data.get('token')
if not token:
return jsonify({
'success': False,
'error': 'No token received from NGINX Proxy Manager'
}), 400
# Create the SSL certificate
ssl_request_data = {
'provider': 'letsencrypt',
'domain_names': domains,
'meta': {
'letsencrypt_agree': True,
'dns_challenge': False
}
}
current_app.logger.info(f"Making SSL certificate request to {nginx_url}/api/nginx/ssl with data: {ssl_request_data}")
ssl_response = requests.post(
f"{nginx_url}/api/nginx/ssl",
headers={
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json',
'Accept': 'application/json'
},
json=ssl_request_data
)
current_app.logger.info(f"SSL certificate response status: {ssl_response.status_code}")
current_app.logger.info(f"SSL certificate response headers: {dict(ssl_response.headers)}")
if not ssl_response.ok:
error_text = ssl_response.text
current_app.logger.error(f"Failed to create SSL certificate: {error_text}")
return jsonify({
'success': False,
'error': f'Failed to create SSL certificate: {error_text}'
}), ssl_response.status_code
ssl_data = ssl_response.json()
current_app.logger.info(f"SSL certificate created successfully: {ssl_data}")
# Get the certificate ID
cert_id = ssl_data.get('id')
if not cert_id:
current_app.logger.error("No certificate ID received in response")
return jsonify({
'success': False,
'error': 'No certificate ID received'
}), 500
# Update the proxy host with the certificate
update_request_data = {
'ssl_certificate_id': cert_id
}
current_app.logger.info(f"Updating proxy host {proxy_host_id} with data: {update_request_data}")
update_response = requests.put(
f"{nginx_url}/api/nginx/proxy-hosts/{proxy_host_id}",
headers={
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json',
'Accept': 'application/json'
},
json=update_request_data
)
current_app.logger.info(f"Update response status: {update_response.status_code}")
current_app.logger.info(f"Update response headers: {dict(update_response.headers)}")
if not update_response.ok:
error_text = update_response.text
current_app.logger.error(f"Failed to update proxy host: {error_text}")
return jsonify({
'success': False,
'error': f'Failed to update proxy host: {error_text}'
}), update_response.status_code
update_data = update_response.json()
current_app.logger.info(f"Proxy host updated successfully: {update_data}")
return jsonify({
'success': True,
'data': {
'certificate': ssl_data,
'proxy_host': update_data
}
})
except Exception as e:
current_app.logger.error(f"Error in create_ssl_certificate: {str(e)}")
return jsonify({
'success': False,
'error': str(e)
}), 500
@admin_api.route('/download-docker-compose', methods=['POST'])
@csrf.exempt
def download_docker_compose():
"""Download docker-compose.yml from the repository"""
data = request.get_json()
if not data or 'repository' not in data or 'branch' not in data:
return jsonify({'message': 'Missing required fields'}), 400
try:
# Get Git settings
git_settings = KeyValueSettings.get_value('git_settings')
if not git_settings:
return jsonify({'message': 'Git settings not configured'}), 400
# Determine the provider and set up the appropriate API call
if git_settings['provider'] == 'gitea':
# For Gitea
headers = {
'Accept': 'application/json',
'Authorization': f'token {git_settings["token"]}'
}
# Try to get the file content
response = requests.get(
f'{git_settings["url"]}/api/v1/repos/{data["repository"]}/contents/docker-compose.yml',
headers=headers,
params={'ref': data['branch']}
)
if response.status_code != 200:
# Try token as query parameter if header auth fails
response = requests.get(
f'{git_settings["url"]}/api/v1/repos/{data["repository"]}/contents/docker-compose.yml?token={git_settings["token"]}',
headers={'Accept': 'application/json'},
params={'ref': data['branch']}
)
elif git_settings['provider'] == 'gitlab':
# For GitLab
headers = {
'PRIVATE-TOKEN': git_settings['token'],
'Accept': 'application/json'
}
# Get the file content
response = requests.get(
f'{git_settings["url"]}/api/v4/projects/{data["repository"].replace("/", "%2F")}/repository/files/docker-compose.yml/raw',
headers=headers,
params={'ref': data['branch']}
)
else:
return jsonify({'message': 'Unsupported Git provider'}), 400
if response.status_code == 200:
# For Gitea, we need to decode the content from base64
if git_settings['provider'] == 'gitea':
content = base64.b64decode(response.json()['content']).decode('utf-8')
else:
content = response.text
return jsonify({
'success': True,
'content': content
})
else:
return jsonify({
'message': f'Failed to download docker-compose.yml: {response.json().get("message", "Unknown error")}'
}), 400
except Exception as e:
current_app.logger.error(f"Error downloading docker-compose.yml: {str(e)}")
return jsonify({'message': f'Error downloading docker-compose.yml: {str(e)}'}), 500
return jsonify({'message': 'Setup mail queued for resending'})