from flask import jsonify, request, current_app, Blueprint from models import ( KeyValueSettings ) from extensions import db, csrf from datetime import datetime import requests import base64 from routes.admin_api import token_required import json launch_api = Blueprint('launch_api', __name__) # Connection Settings @launch_api.route('/test-portainer-connection', methods=['POST']) @csrf.exempt def test_portainer_connection(): data = request.get_json() url = data.get('url') api_key = data.get('api_key') if not url or not api_key: return jsonify({'error': 'Missing required fields'}), 400 try: # Test Portainer connection response = requests.get( f"{url.rstrip('/')}/api/status", headers={ 'X-API-Key': api_key, 'Accept': 'application/json' }, timeout=5 ) if response.status_code == 200: return jsonify({'message': 'Connection successful'}) else: return jsonify({'error': 'Failed to connect to Portainer'}), 400 except Exception as e: return jsonify({'error': str(e)}), 500 @launch_api.route('/test-nginx-connection', methods=['POST']) @csrf.exempt def test_nginx_connection(): data = request.get_json() url = data.get('url') username = data.get('username') password = data.get('password') if not url or not username or not password: return jsonify({'error': 'Missing required fields'}), 400 try: # First, get the JWT token token_response = requests.post( f"{url.rstrip('/')}/api/tokens", json={ 'identity': username, 'secret': password }, headers={'Content-Type': 'application/json'}, timeout=5 ) if token_response.status_code != 200: return jsonify({'error': 'Failed to authenticate with NGINX Proxy Manager'}), 400 token_data = token_response.json() token = token_data.get('token') if not token: return jsonify({'error': 'No token received from NGINX Proxy Manager'}), 400 # Now test the connection using the token response = requests.get( f"{url.rstrip('/')}/api/nginx/proxy-hosts", headers={ 'Authorization': f'Bearer {token}', 'Accept': 'application/json' }, timeout=5 ) if response.status_code == 200: return jsonify({'message': 'Connection successful'}) else: return jsonify({'error': 'Failed to connect to NGINX Proxy Manager'}), 400 except Exception as e: return jsonify({'error': str(e)}), 500 @launch_api.route('/save-portainer-connection', methods=['POST']) @csrf.exempt @token_required def save_portainer_connection(current_user): if not current_user.is_admin: return jsonify({'error': 'Unauthorized'}), 403 data = request.get_json() url = data.get('url') api_key = data.get('api_key') if not url or not api_key: return jsonify({'error': 'Missing required fields'}), 400 try: # Save Portainer settings KeyValueSettings.set_value('portainer_settings', { 'url': url, 'api_key': api_key }) return jsonify({'message': 'Settings saved successfully'}) except Exception as e: return jsonify({'error': str(e)}), 500 @launch_api.route('/save-nginx-connection', methods=['POST']) @csrf.exempt @token_required def save_nginx_connection(current_user): if not current_user.is_admin: return jsonify({'error': 'Unauthorized'}), 403 data = request.get_json() url = data.get('url') username = data.get('username') password = data.get('password') if not url or not username or not password: return jsonify({'error': 'Missing required fields'}), 400 try: # Save NGINX Proxy Manager settings KeyValueSettings.set_value('nginx_settings', { 'url': url, 'username': username, 'password': password }) return jsonify({'message': 'Settings saved successfully'}) except Exception as e: return jsonify({'error': str(e)}), 500 @launch_api.route('/generate-gitea-token', methods=['POST']) @csrf.exempt def generate_gitea_token(): """Generate a new Gitea API token""" data = request.get_json() if not data or 'url' not in data or 'username' not in data or 'password' not in data: return jsonify({'message': 'Missing required fields'}), 400 try: headers = { 'Content-Type': 'application/json' } if data.get('otp'): headers['X-Gitea-OTP'] = data['otp'] # Generate token with required scopes token_data = { 'name': f'docupulse_{datetime.utcnow().strftime("%Y%m%d_%H%M%S")}', 'scopes': [ 'read:activitypub', 'read:issue', 'write:misc', 'read:notification', 'read:organization', 'read:package', 'read:repository', 'read:user' ] } # Make request to Gitea API response = requests.post( f'{data["url"]}/api/v1/users/{data["username"]}/tokens', headers=headers, json=token_data, auth=(data['username'], data['password']) ) if response.status_code == 201: token_data = response.json() return jsonify({ 'token': token_data['sha1'], 'name': token_data['name'], 'token_last_eight': token_data['token_last_eight'] }), 200 else: return jsonify({'message': f'Failed to generate token: {response.json().get("message", "Unknown error")}'}), 400 except Exception as e: return jsonify({'message': f'Failed to generate token: {str(e)}'}), 400 @launch_api.route('/list-gitea-repos', methods=['POST']) @csrf.exempt def list_gitea_repos(): """List repositories from Gitea""" data = request.get_json() if not data or 'url' not in data or 'token' not in data: return jsonify({'message': 'Missing required fields'}), 400 try: # Try different authentication methods headers = { 'Accept': 'application/json' } # First try token in Authorization header headers['Authorization'] = f'token {data["token"]}' # Get user's repositories response = requests.get( f'{data["url"]}/api/v1/user/repos', headers=headers ) # If that fails, try token as query parameter if response.status_code != 200: response = requests.get( f'{data["url"]}/api/v1/user/repos?token={data["token"]}', headers={'Accept': 'application/json'} ) if response.status_code == 200: return jsonify({ 'repositories': response.json() }), 200 else: return jsonify({'message': f'Failed to list repositories: {response.json().get("message", "Unknown error")}'}), 400 except Exception as e: return jsonify({'message': f'Failed to list repositories: {str(e)}'}), 400 @launch_api.route('/list-gitea-branches', methods=['POST']) @csrf.exempt def list_gitea_branches(): """List branches from a Gitea repository""" data = request.get_json() if not data or 'url' not in data or 'token' not in data or 'repo' not in data: return jsonify({'message': 'Missing required fields'}), 400 try: # Try different authentication methods headers = { 'Accept': 'application/json' } # First try token in Authorization header headers['Authorization'] = f'token {data["token"]}' # Get repository branches response = requests.get( f'{data["url"]}/api/v1/repos/{data["repo"]}/branches', headers=headers ) # If that fails, try token as query parameter if response.status_code != 200: response = requests.get( f'{data["url"]}/api/v1/repos/{data["repo"]}/branches?token={data["token"]}', headers={'Accept': 'application/json'} ) if response.status_code == 200: return jsonify({ 'branches': response.json() }), 200 else: return jsonify({'message': f'Failed to list branches: {response.json().get("message", "Unknown error")}'}), 400 except Exception as e: return jsonify({'message': f'Failed to list branches: {str(e)}'}), 400 @launch_api.route('/list-gitlab-repos', methods=['POST']) @csrf.exempt def list_gitlab_repos(): """List repositories from GitLab""" data = request.get_json() if not data or 'url' not in data or 'token' not in data: return jsonify({'message': 'Missing required fields'}), 400 try: headers = { 'PRIVATE-TOKEN': data['token'], 'Accept': 'application/json' } # Get user's projects (repositories) response = requests.get( f'{data["url"]}/api/v4/projects', headers=headers, params={'membership': 'true'} # Only get projects where user is a member ) if response.status_code == 200: return jsonify({ 'repositories': response.json() }), 200 else: return jsonify({'message': f'Failed to list repositories: {response.json().get("message", "Unknown error")}'}), 400 except Exception as e: return jsonify({'message': f'Failed to list repositories: {str(e)}'}), 400 @launch_api.route('/test-git-connection', methods=['POST']) @csrf.exempt def test_git_connection(): """Test the connection to a Git repository""" data = request.get_json() if not data or 'provider' not in data or 'url' not in data or 'username' not in data or 'token' not in data or 'repo' not in data: return jsonify({'message': 'Missing required fields'}), 400 try: if data['provider'] == 'gitea': # Test Gitea connection with different authentication methods headers = { 'Accept': 'application/json' } # First try token in Authorization header headers['Authorization'] = f'token {data["token"]}' # Try to get repository information response = requests.get( f'{data["url"]}/api/v1/repos/{data["repo"]}', headers=headers ) # If that fails, try token as query parameter if response.status_code != 200: response = requests.get( f'{data["url"]}/api/v1/repos/{data["repo"]}?token={data["token"]}', headers={'Accept': 'application/json'} ) if response.status_code == 200: return jsonify({'message': 'Connection successful'}), 200 else: return jsonify({'message': f'Connection failed: {response.json().get("message", "Unknown error")}'}), 400 elif data['provider'] == 'gitlab': # Test GitLab connection headers = { 'PRIVATE-TOKEN': data['token'], 'Accept': 'application/json' } # Try to get repository information response = requests.get( f'{data["url"]}/api/v4/projects/{data["repo"].replace("/", "%2F")}', headers=headers ) if response.status_code == 200: return jsonify({'message': 'Connection successful'}), 200 else: return jsonify({'message': f'Connection failed: {response.json().get("message", "Unknown error")}'}), 400 else: return jsonify({'message': 'Invalid Git provider'}), 400 except Exception as e: return jsonify({'message': f'Connection failed: {str(e)}'}), 400 @launch_api.route('/save-git-connection', methods=['POST']) @csrf.exempt @token_required def save_git_connection(current_user): if not current_user.is_admin: return jsonify({'error': 'Unauthorized'}), 403 data = request.get_json() provider = data.get('provider') url = data.get('url') username = data.get('username') token = data.get('token') repo = data.get('repo') if not provider or not url or not username or not token or not repo: return jsonify({'error': 'Missing required fields'}), 400 if provider not in ['gitea', 'gitlab']: return jsonify({'error': 'Invalid provider'}), 400 try: # Save Git settings KeyValueSettings.set_value('git_settings', { 'provider': provider, 'url': url, 'username': username, 'token': token, 'repo': repo }) return jsonify({'message': 'Settings saved successfully'}) except Exception as e: return jsonify({'error': str(e)}), 500 @launch_api.route('/create-proxy-host', methods=['POST']) @csrf.exempt @token_required def create_proxy_host(current_user): if not current_user.is_admin: return jsonify({'error': 'Unauthorized'}), 403 data = request.get_json() domains = data.get('domains') scheme = data.get('scheme', 'http') forward_ip = data.get('forward_ip') forward_port = data.get('forward_port') if not domains or not forward_ip or not forward_port: return jsonify({'error': 'Missing required fields'}), 400 try: # Get NGINX settings nginx_settings = KeyValueSettings.get_value('nginx_settings') if not nginx_settings: return jsonify({'error': 'NGINX settings not configured'}), 400 # First, get the JWT token token_response = requests.post( f"{nginx_settings['url'].rstrip('/')}/api/tokens", json={ 'identity': nginx_settings['username'], 'secret': nginx_settings['password'] }, headers={'Content-Type': 'application/json'}, timeout=5 ) if token_response.status_code != 200: return jsonify({'error': 'Failed to authenticate with NGINX Proxy Manager'}), 400 token_data = token_response.json() token = token_data.get('token') if not token: return jsonify({'error': 'No token received from NGINX Proxy Manager'}), 400 # Create the proxy host proxy_host_data = { 'domain_names': domains, 'forward_scheme': scheme, 'forward_host': forward_ip, 'forward_port': int(forward_port), 'ssl_forced': True, 'caching_enabled': True, 'block_exploits': True, 'allow_websocket_upgrade': True, 'http2_support': True, 'hsts_enabled': True, 'hsts_subdomains': True, 'meta': { 'letsencrypt_agree': True, 'dns_challenge': False } } response = requests.post( f"{nginx_settings['url'].rstrip('/')}/api/nginx/proxy-hosts", json=proxy_host_data, headers={ 'Authorization': f'Bearer {token}', 'Content-Type': 'application/json', 'Accept': 'application/json' }, timeout=5 ) if response.status_code == 200: return jsonify({ 'message': 'Proxy host created successfully', 'data': response.json() }) else: error_data = response.json() return jsonify({ 'error': f'Failed to create proxy host: {error_data.get("message", "Unknown error")}' }), 400 except Exception as e: return jsonify({'error': str(e)}), 500 @launch_api.route('/create-ssl-certificate', methods=['POST']) @csrf.exempt @token_required def create_ssl_certificate(current_user): try: data = request.get_json() current_app.logger.info(f"Received request data: {data}") domains = data.get('domains') proxy_host_id = data.get('proxy_host_id') nginx_url = data.get('nginx_url') current_app.logger.info(f"Extracted data - domains: {domains}, proxy_host_id: {proxy_host_id}, nginx_url: {nginx_url}") if not all([domains, proxy_host_id, nginx_url]): missing_fields = [] if not domains: missing_fields.append('domains') if not proxy_host_id: missing_fields.append('proxy_host_id') if not nginx_url: missing_fields.append('nginx_url') current_app.logger.error(f"Missing required fields: {missing_fields}") return jsonify({ 'success': False, 'error': f'Missing required fields: {", ".join(missing_fields)}' }), 400 # Get NGINX settings nginx_settings = KeyValueSettings.get_value('nginx_settings') if not nginx_settings: return jsonify({ 'success': False, 'error': 'NGINX settings not configured' }), 400 # First, get the JWT token token_response = requests.post( f"{nginx_settings['url'].rstrip('/')}/api/tokens", json={ 'identity': nginx_settings['username'], 'secret': nginx_settings['password'] }, headers={'Content-Type': 'application/json'}, timeout=5 ) if token_response.status_code != 200: return jsonify({ 'success': False, 'error': 'Failed to authenticate with NGINX Proxy Manager' }), 400 token_data = token_response.json() token = token_data.get('token') if not token: return jsonify({ 'success': False, 'error': 'No token received from NGINX Proxy Manager' }), 400 # Create the SSL certificate ssl_request_data = { 'provider': 'letsencrypt', 'domain_names': domains, 'meta': { 'letsencrypt_agree': True, 'dns_challenge': False } } current_app.logger.info(f"Making SSL certificate request to {nginx_url}/api/nginx/ssl with data: {ssl_request_data}") ssl_response = requests.post( f"{nginx_url}/api/nginx/ssl", headers={ 'Authorization': f'Bearer {token}', 'Content-Type': 'application/json', 'Accept': 'application/json' }, json=ssl_request_data ) current_app.logger.info(f"SSL certificate response status: {ssl_response.status_code}") current_app.logger.info(f"SSL certificate response headers: {dict(ssl_response.headers)}") if not ssl_response.ok: error_text = ssl_response.text current_app.logger.error(f"Failed to create SSL certificate: {error_text}") return jsonify({ 'success': False, 'error': f'Failed to create SSL certificate: {error_text}' }), ssl_response.status_code ssl_data = ssl_response.json() current_app.logger.info(f"SSL certificate created successfully: {ssl_data}") # Get the certificate ID cert_id = ssl_data.get('id') if not cert_id: current_app.logger.error("No certificate ID received in response") return jsonify({ 'success': False, 'error': 'No certificate ID received' }), 500 # Update the proxy host with the certificate update_request_data = { 'ssl_certificate_id': cert_id } current_app.logger.info(f"Updating proxy host {proxy_host_id} with data: {update_request_data}") update_response = requests.put( f"{nginx_url}/api/nginx/proxy-hosts/{proxy_host_id}", headers={ 'Authorization': f'Bearer {token}', 'Content-Type': 'application/json', 'Accept': 'application/json' }, json=update_request_data ) current_app.logger.info(f"Update response status: {update_response.status_code}") current_app.logger.info(f"Update response headers: {dict(update_response.headers)}") if not update_response.ok: error_text = update_response.text current_app.logger.error(f"Failed to update proxy host: {error_text}") return jsonify({ 'success': False, 'error': f'Failed to update proxy host: {error_text}' }), update_response.status_code update_data = update_response.json() current_app.logger.info(f"Proxy host updated successfully: {update_data}") return jsonify({ 'success': True, 'data': { 'certificate': ssl_data, 'proxy_host': update_data } }) except Exception as e: current_app.logger.error(f"Error in create_ssl_certificate: {str(e)}") return jsonify({ 'success': False, 'error': str(e) }), 500 @launch_api.route('/download-docker-compose', methods=['POST']) @csrf.exempt def download_docker_compose(): """Download docker-compose.yml from the repository""" data = request.get_json() if not data or 'repository' not in data or 'branch' not in data: return jsonify({'message': 'Missing required fields'}), 400 try: # Get Git settings git_settings = KeyValueSettings.get_value('git_settings') if not git_settings: return jsonify({'message': 'Git settings not configured'}), 400 # Determine the provider and set up the appropriate API call if git_settings['provider'] == 'gitea': # For Gitea headers = { 'Accept': 'application/json', 'Authorization': f'token {git_settings["token"]}' } # Try to get the file content response = requests.get( f'{git_settings["url"]}/api/v1/repos/{data["repository"]}/contents/docker-compose.yml', headers=headers, params={'ref': data['branch']} ) if response.status_code != 200: # Try token as query parameter if header auth fails response = requests.get( f'{git_settings["url"]}/api/v1/repos/{data["repository"]}/contents/docker-compose.yml?token={git_settings["token"]}', headers={'Accept': 'application/json'}, params={'ref': data['branch']} ) elif git_settings['provider'] == 'gitlab': # For GitLab headers = { 'PRIVATE-TOKEN': git_settings['token'], 'Accept': 'application/json' } # Get the file content response = requests.get( f'{git_settings["url"]}/api/v4/projects/{data["repository"].replace("/", "%2F")}/repository/files/docker-compose.yml/raw', headers=headers, params={'ref': data['branch']} ) else: return jsonify({'message': 'Unsupported Git provider'}), 400 if response.status_code == 200: # For Gitea, we need to decode the content from base64 if git_settings['provider'] == 'gitea': content = base64.b64decode(response.json()['content']).decode('utf-8') else: content = response.text return jsonify({ 'success': True, 'content': content }) else: return jsonify({ 'message': f'Failed to download docker-compose.yml: {response.json().get("message", "Unknown error")}' }), 400 except Exception as e: current_app.logger.error(f"Error downloading docker-compose.yml: {str(e)}") return jsonify({'message': f'Error downloading docker-compose.yml: {str(e)}'}), 500 @launch_api.route('/deploy-stack', methods=['POST']) @csrf.exempt def deploy_stack(): try: data = request.get_json() if not data or 'name' not in data or 'StackFileContent' not in data: return jsonify({'error': 'Missing required fields'}), 400 # Get Portainer settings portainer_settings = KeyValueSettings.get_value('portainer_settings') if not portainer_settings: return jsonify({'error': 'Portainer settings not configured'}), 400 # Verify Portainer authentication auth_response = requests.get( f"{portainer_settings['url'].rstrip('/')}/api/status", headers={ 'X-API-Key': portainer_settings['api_key'], 'Accept': 'application/json' }, timeout=30 # 30 seconds timeout for status check ) if not auth_response.ok: current_app.logger.error(f"Portainer authentication failed: {auth_response.text}") return jsonify({'error': 'Failed to authenticate with Portainer'}), 401 # Get Portainer endpoint ID (assuming it's the first endpoint) endpoint_response = requests.get( f"{portainer_settings['url'].rstrip('/')}/api/endpoints", headers={ 'X-API-Key': portainer_settings['api_key'], 'Accept': 'application/json' }, timeout=30 # 30 seconds timeout for endpoint check ) if not endpoint_response.ok: error_text = endpoint_response.text try: error_json = endpoint_response.json() error_text = error_json.get('message', error_text) except: pass return jsonify({'error': f'Failed to get Portainer endpoints: {error_text}'}), 500 endpoints = endpoint_response.json() if not endpoints: return jsonify({'error': 'No Portainer endpoints found'}), 400 endpoint_id = endpoints[0]['Id'] # Log the request data current_app.logger.info(f"Creating stack with data: {json.dumps(data)}") current_app.logger.info(f"Using endpoint ID: {endpoint_id}") # First, check if a stack with this name already exists stacks_url = f"{portainer_settings['url'].rstrip('/')}/api/stacks" stacks_response = requests.get( stacks_url, headers={ 'X-API-Key': portainer_settings['api_key'], 'Accept': 'application/json' }, params={'filters': json.dumps({'Name': data['name']})}, timeout=30 ) if stacks_response.ok: existing_stacks = stacks_response.json() for stack in existing_stacks: if stack['Name'] == data['name']: current_app.logger.info(f"Found existing stack: {stack['Name']} (ID: {stack['Id']})") return jsonify({ 'name': stack['Name'], 'id': stack['Id'], 'status': 'existing' }) # If no existing stack found, proceed with creation url = f"{portainer_settings['url'].rstrip('/')}/api/stacks/create/standalone/string" current_app.logger.info(f"Making request to: {url}") # Prepare the request body according to Portainer's API spec request_body = data # Add endpointId as a query parameter params = {'endpointId': endpoint_id} # Set a longer timeout for stack creation (10 minutes) create_response = requests.post( url, headers={ 'X-API-Key': portainer_settings['api_key'], 'Content-Type': 'application/json', 'Accept': 'application/json' }, params=params, json=request_body, timeout=600 # 10 minutes timeout for stack creation ) # Log the response details current_app.logger.info(f"Response status: {create_response.status_code}") current_app.logger.info(f"Response headers: {dict(create_response.headers)}") response_text = create_response.text current_app.logger.info(f"Response body: {response_text}") if not create_response.ok: error_message = response_text try: error_json = create_response.json() error_message = error_json.get('message', error_message) except: pass return jsonify({'error': f'Failed to create stack: {error_message}'}), 500 stack_info = create_response.json() return jsonify({ 'name': stack_info['Name'], 'id': stack_info['Id'], 'status': 'created' }) except requests.exceptions.Timeout: current_app.logger.error("Request timed out while deploying stack") return jsonify({'error': 'Request timed out while deploying stack. The operation may still be in progress.'}), 504 except Exception as e: current_app.logger.error(f"Error deploying stack: {str(e)}") return jsonify({'error': str(e)}), 500 @launch_api.route('/save-instance', methods=['POST']) @csrf.exempt def save_instance(): try: if not request.is_json: return jsonify({'error': 'Request must be JSON'}), 400 data = request.get_json() required_fields = ['name', 'port', 'domains', 'stack_id', 'stack_name', 'status', 'repository', 'branch'] if not all(field in data for field in required_fields): missing_fields = [field for field in required_fields if field not in data] return jsonify({'error': f'Missing required fields: {", ".join(missing_fields)}'}), 400 # Save instance data instance_data = { 'name': data['name'], 'port': data['port'], 'domains': data['domains'], 'stack_id': data['stack_id'], 'stack_name': data['stack_name'], 'status': data['status'], 'repository': data['repository'], 'branch': data['branch'], 'created_at': datetime.utcnow().isoformat() } # Save to database using KeyValueSettings KeyValueSettings.set_value(f'instance_{data["name"]}', instance_data) return jsonify({ 'message': 'Instance data saved successfully', 'data': instance_data }) except Exception as e: current_app.logger.error(f"Error saving instance data: {str(e)}") return jsonify({'error': str(e)}), 500