Files
docupulse/routes/launch_api.py
2025-06-20 12:51:37 +02:00

1193 lines
42 KiB
Python

from flask import jsonify, request, current_app, Blueprint
from models import (
KeyValueSettings,
Instance
)
from extensions import db, csrf
from datetime import datetime
import requests
import base64
from routes.admin_api import token_required
import json
launch_api = Blueprint('launch_api', __name__)
# Connection Settings
@launch_api.route('/test-portainer-connection', methods=['POST'])
@csrf.exempt
def test_portainer_connection():
data = request.get_json()
url = data.get('url')
api_key = data.get('api_key')
if not url or not api_key:
return jsonify({'error': 'Missing required fields'}), 400
try:
# Test Portainer connection
response = requests.get(
f"{url.rstrip('/')}/api/status",
headers={
'X-API-Key': api_key,
'Accept': 'application/json'
},
timeout=5
)
if response.status_code == 200:
return jsonify({'message': 'Connection successful'})
else:
return jsonify({'error': 'Failed to connect to Portainer'}), 400
except Exception as e:
return jsonify({'error': str(e)}), 500
@launch_api.route('/test-nginx-connection', methods=['POST'])
@csrf.exempt
def test_nginx_connection():
data = request.get_json()
url = data.get('url')
username = data.get('username')
password = data.get('password')
if not url or not username or not password:
return jsonify({'error': 'Missing required fields'}), 400
try:
# First, get the JWT token
token_response = requests.post(
f"{url.rstrip('/')}/api/tokens",
json={
'identity': username,
'secret': password
},
headers={'Content-Type': 'application/json'},
timeout=5
)
if token_response.status_code != 200:
return jsonify({'error': 'Failed to authenticate with NGINX Proxy Manager'}), 400
token_data = token_response.json()
token = token_data.get('token')
if not token:
return jsonify({'error': 'No token received from NGINX Proxy Manager'}), 400
# Now test the connection using the token
response = requests.get(
f"{url.rstrip('/')}/api/nginx/proxy-hosts",
headers={
'Authorization': f'Bearer {token}',
'Accept': 'application/json'
},
timeout=5
)
if response.status_code == 200:
return jsonify({'message': 'Connection successful'})
else:
return jsonify({'error': 'Failed to connect to NGINX Proxy Manager'}), 400
except Exception as e:
return jsonify({'error': str(e)}), 500
@launch_api.route('/save-portainer-connection', methods=['POST'])
@csrf.exempt
@token_required
def save_portainer_connection(current_user):
if not current_user.is_admin:
return jsonify({'error': 'Unauthorized'}), 403
data = request.get_json()
url = data.get('url')
api_key = data.get('api_key')
if not url or not api_key:
return jsonify({'error': 'Missing required fields'}), 400
try:
# Save Portainer settings
KeyValueSettings.set_value('portainer_settings', {
'url': url,
'api_key': api_key
})
return jsonify({'message': 'Settings saved successfully'})
except Exception as e:
return jsonify({'error': str(e)}), 500
@launch_api.route('/save-nginx-connection', methods=['POST'])
@csrf.exempt
@token_required
def save_nginx_connection(current_user):
if not current_user.is_admin:
return jsonify({'error': 'Unauthorized'}), 403
data = request.get_json()
url = data.get('url')
username = data.get('username')
password = data.get('password')
if not url or not username or not password:
return jsonify({'error': 'Missing required fields'}), 400
try:
# Save NGINX Proxy Manager settings
KeyValueSettings.set_value('nginx_settings', {
'url': url,
'username': username,
'password': password
})
return jsonify({'message': 'Settings saved successfully'})
except Exception as e:
return jsonify({'error': str(e)}), 500
@launch_api.route('/generate-gitea-token', methods=['POST'])
@csrf.exempt
def generate_gitea_token():
"""Generate a new Gitea API token"""
data = request.get_json()
if not data or 'url' not in data or 'username' not in data or 'password' not in data:
return jsonify({'message': 'Missing required fields'}), 400
try:
headers = {
'Content-Type': 'application/json'
}
if data.get('otp'):
headers['X-Gitea-OTP'] = data['otp']
# Generate token with required scopes
token_data = {
'name': f'docupulse_{datetime.utcnow().strftime("%Y%m%d_%H%M%S")}',
'scopes': [
'read:activitypub',
'read:issue',
'write:misc',
'read:notification',
'read:organization',
'read:package',
'read:repository',
'read:user'
]
}
# Make request to Gitea API
response = requests.post(
f'{data["url"]}/api/v1/users/{data["username"]}/tokens',
headers=headers,
json=token_data,
auth=(data['username'], data['password'])
)
if response.status_code == 201:
token_data = response.json()
return jsonify({
'token': token_data['sha1'],
'name': token_data['name'],
'token_last_eight': token_data['token_last_eight']
}), 200
else:
return jsonify({'message': f'Failed to generate token: {response.json().get("message", "Unknown error")}'}), 400
except Exception as e:
return jsonify({'message': f'Failed to generate token: {str(e)}'}), 400
@launch_api.route('/list-gitea-repos', methods=['POST'])
@csrf.exempt
def list_gitea_repos():
"""List repositories from Gitea"""
data = request.get_json()
if not data or 'url' not in data or 'token' not in data:
return jsonify({'message': 'Missing required fields'}), 400
try:
# Try different authentication methods
headers = {
'Accept': 'application/json'
}
# First try token in Authorization header
headers['Authorization'] = f'token {data["token"]}'
# Get user's repositories
response = requests.get(
f'{data["url"]}/api/v1/user/repos',
headers=headers
)
# If that fails, try token as query parameter
if response.status_code != 200:
response = requests.get(
f'{data["url"]}/api/v1/user/repos?token={data["token"]}',
headers={'Accept': 'application/json'}
)
if response.status_code == 200:
return jsonify({
'repositories': response.json()
}), 200
else:
return jsonify({'message': f'Failed to list repositories: {response.json().get("message", "Unknown error")}'}), 400
except Exception as e:
return jsonify({'message': f'Failed to list repositories: {str(e)}'}), 400
@launch_api.route('/list-gitea-branches', methods=['POST'])
@csrf.exempt
def list_gitea_branches():
"""List branches from a Gitea repository"""
data = request.get_json()
if not data or 'url' not in data or 'token' not in data or 'repo' not in data:
return jsonify({'message': 'Missing required fields'}), 400
try:
# Try different authentication methods
headers = {
'Accept': 'application/json'
}
# First try token in Authorization header
headers['Authorization'] = f'token {data["token"]}'
# Get repository branches
response = requests.get(
f'{data["url"]}/api/v1/repos/{data["repo"]}/branches',
headers=headers
)
# If that fails, try token as query parameter
if response.status_code != 200:
response = requests.get(
f'{data["url"]}/api/v1/repos/{data["repo"]}/branches?token={data["token"]}',
headers={'Accept': 'application/json'}
)
if response.status_code == 200:
return jsonify({
'branches': response.json()
}), 200
else:
return jsonify({'message': f'Failed to list branches: {response.json().get("message", "Unknown error")}'}), 400
except Exception as e:
return jsonify({'message': f'Failed to list branches: {str(e)}'}), 400
@launch_api.route('/list-gitlab-repos', methods=['POST'])
@csrf.exempt
def list_gitlab_repos():
"""List repositories from GitLab"""
data = request.get_json()
if not data or 'url' not in data or 'token' not in data:
return jsonify({'message': 'Missing required fields'}), 400
try:
headers = {
'PRIVATE-TOKEN': data['token'],
'Accept': 'application/json'
}
# Get user's projects (repositories)
response = requests.get(
f'{data["url"]}/api/v4/projects',
headers=headers,
params={'membership': 'true'} # Only get projects where user is a member
)
if response.status_code == 200:
return jsonify({
'repositories': response.json()
}), 200
else:
return jsonify({'message': f'Failed to list repositories: {response.json().get("message", "Unknown error")}'}), 400
except Exception as e:
return jsonify({'message': f'Failed to list repositories: {str(e)}'}), 400
@launch_api.route('/test-git-connection', methods=['POST'])
@csrf.exempt
def test_git_connection():
"""Test the connection to a Git repository"""
data = request.get_json()
if not data or 'provider' not in data or 'url' not in data or 'username' not in data or 'token' not in data or 'repo' not in data:
return jsonify({'message': 'Missing required fields'}), 400
try:
if data['provider'] == 'gitea':
# Test Gitea connection with different authentication methods
headers = {
'Accept': 'application/json'
}
# First try token in Authorization header
headers['Authorization'] = f'token {data["token"]}'
# Try to get repository information
response = requests.get(
f'{data["url"]}/api/v1/repos/{data["repo"]}',
headers=headers
)
# If that fails, try token as query parameter
if response.status_code != 200:
response = requests.get(
f'{data["url"]}/api/v1/repos/{data["repo"]}?token={data["token"]}',
headers={'Accept': 'application/json'}
)
if response.status_code == 200:
return jsonify({'message': 'Connection successful'}), 200
else:
return jsonify({'message': f'Connection failed: {response.json().get("message", "Unknown error")}'}), 400
elif data['provider'] == 'gitlab':
# Test GitLab connection
headers = {
'PRIVATE-TOKEN': data['token'],
'Accept': 'application/json'
}
# Try to get repository information
response = requests.get(
f'{data["url"]}/api/v4/projects/{data["repo"].replace("/", "%2F")}',
headers=headers
)
if response.status_code == 200:
return jsonify({'message': 'Connection successful'}), 200
else:
return jsonify({'message': f'Connection failed: {response.json().get("message", "Unknown error")}'}), 400
else:
return jsonify({'message': 'Invalid Git provider'}), 400
except Exception as e:
return jsonify({'message': f'Connection failed: {str(e)}'}), 400
@launch_api.route('/save-git-connection', methods=['POST'])
@csrf.exempt
@token_required
def save_git_connection(current_user):
if not current_user.is_admin:
return jsonify({'error': 'Unauthorized'}), 403
data = request.get_json()
provider = data.get('provider')
url = data.get('url')
username = data.get('username')
token = data.get('token')
repo = data.get('repo')
if not provider or not url or not username or not token or not repo:
return jsonify({'error': 'Missing required fields'}), 400
if provider not in ['gitea', 'gitlab']:
return jsonify({'error': 'Invalid provider'}), 400
try:
# Save Git settings
KeyValueSettings.set_value('git_settings', {
'provider': provider,
'url': url,
'username': username,
'token': token,
'repo': repo
})
return jsonify({'message': 'Settings saved successfully'})
except Exception as e:
return jsonify({'error': str(e)}), 500
@launch_api.route('/create-proxy-host', methods=['POST'])
@csrf.exempt
@token_required
def create_proxy_host(current_user):
if not current_user.is_admin:
return jsonify({'error': 'Unauthorized'}), 403
data = request.get_json()
domains = data.get('domains')
scheme = data.get('scheme', 'http')
forward_ip = data.get('forward_ip')
forward_port = data.get('forward_port')
if not domains or not forward_ip or not forward_port:
return jsonify({'error': 'Missing required fields'}), 400
try:
# Get NGINX settings
nginx_settings = KeyValueSettings.get_value('nginx_settings')
if not nginx_settings:
return jsonify({'error': 'NGINX settings not configured'}), 400
# First, get the JWT token
token_response = requests.post(
f"{nginx_settings['url'].rstrip('/')}/api/tokens",
json={
'identity': nginx_settings['username'],
'secret': nginx_settings['password']
},
headers={'Content-Type': 'application/json'},
timeout=5
)
if token_response.status_code != 200:
return jsonify({'error': 'Failed to authenticate with NGINX Proxy Manager'}), 400
token_data = token_response.json()
token = token_data.get('token')
if not token:
return jsonify({'error': 'No token received from NGINX Proxy Manager'}), 400
# Create the proxy host
proxy_host_data = {
'domain_names': domains,
'forward_scheme': scheme,
'forward_host': forward_ip,
'forward_port': int(forward_port),
'ssl_forced': True,
'caching_enabled': True,
'block_exploits': True,
'allow_websocket_upgrade': True,
'http2_support': True,
'hsts_enabled': True,
'hsts_subdomains': True,
'meta': {
'letsencrypt_agree': True,
'dns_challenge': False
}
}
response = requests.post(
f"{nginx_settings['url'].rstrip('/')}/api/nginx/proxy-hosts",
json=proxy_host_data,
headers={
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json',
'Accept': 'application/json'
},
timeout=5
)
if response.status_code == 200:
return jsonify({
'message': 'Proxy host created successfully',
'data': response.json()
})
else:
error_data = response.json()
return jsonify({
'error': f'Failed to create proxy host: {error_data.get("message", "Unknown error")}'
}), 400
except Exception as e:
return jsonify({'error': str(e)}), 500
@launch_api.route('/create-ssl-certificate', methods=['POST'])
@csrf.exempt
@token_required
def create_ssl_certificate(current_user):
try:
data = request.get_json()
current_app.logger.info(f"Received request data: {data}")
domains = data.get('domains')
proxy_host_id = data.get('proxy_host_id')
nginx_url = data.get('nginx_url')
current_app.logger.info(f"Extracted data - domains: {domains}, proxy_host_id: {proxy_host_id}, nginx_url: {nginx_url}")
if not all([domains, proxy_host_id, nginx_url]):
missing_fields = []
if not domains: missing_fields.append('domains')
if not proxy_host_id: missing_fields.append('proxy_host_id')
if not nginx_url: missing_fields.append('nginx_url')
current_app.logger.error(f"Missing required fields: {missing_fields}")
return jsonify({
'success': False,
'error': f'Missing required fields: {", ".join(missing_fields)}'
}), 400
# Get NGINX settings
nginx_settings = KeyValueSettings.get_value('nginx_settings')
if not nginx_settings:
return jsonify({
'success': False,
'error': 'NGINX settings not configured'
}), 400
# First, get the JWT token
token_response = requests.post(
f"{nginx_settings['url'].rstrip('/')}/api/tokens",
json={
'identity': nginx_settings['username'],
'secret': nginx_settings['password']
},
headers={'Content-Type': 'application/json'},
timeout=5
)
if token_response.status_code != 200:
return jsonify({
'success': False,
'error': 'Failed to authenticate with NGINX Proxy Manager'
}), 400
token_data = token_response.json()
token = token_data.get('token')
if not token:
return jsonify({
'success': False,
'error': 'No token received from NGINX Proxy Manager'
}), 400
# Create the SSL certificate
ssl_request_data = {
'provider': 'letsencrypt',
'domain_names': domains,
'meta': {
'letsencrypt_agree': True,
'dns_challenge': False
}
}
current_app.logger.info(f"Making SSL certificate request to {nginx_url}/api/nginx/ssl with data: {ssl_request_data}")
ssl_response = requests.post(
f"{nginx_url}/api/nginx/ssl",
headers={
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json',
'Accept': 'application/json'
},
json=ssl_request_data
)
current_app.logger.info(f"SSL certificate response status: {ssl_response.status_code}")
current_app.logger.info(f"SSL certificate response headers: {dict(ssl_response.headers)}")
if not ssl_response.ok:
error_text = ssl_response.text
current_app.logger.error(f"Failed to create SSL certificate: {error_text}")
return jsonify({
'success': False,
'error': f'Failed to create SSL certificate: {error_text}'
}), ssl_response.status_code
ssl_data = ssl_response.json()
current_app.logger.info(f"SSL certificate created successfully: {ssl_data}")
# Get the certificate ID
cert_id = ssl_data.get('id')
if not cert_id:
current_app.logger.error("No certificate ID received in response")
return jsonify({
'success': False,
'error': 'No certificate ID received'
}), 500
# Update the proxy host with the certificate
update_request_data = {
'ssl_certificate_id': cert_id
}
current_app.logger.info(f"Updating proxy host {proxy_host_id} with data: {update_request_data}")
update_response = requests.put(
f"{nginx_url}/api/nginx/proxy-hosts/{proxy_host_id}",
headers={
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json',
'Accept': 'application/json'
},
json=update_request_data
)
current_app.logger.info(f"Update response status: {update_response.status_code}")
current_app.logger.info(f"Update response headers: {dict(update_response.headers)}")
if not update_response.ok:
error_text = update_response.text
current_app.logger.error(f"Failed to update proxy host: {error_text}")
return jsonify({
'success': False,
'error': f'Failed to update proxy host: {error_text}'
}), update_response.status_code
update_data = update_response.json()
current_app.logger.info(f"Proxy host updated successfully: {update_data}")
return jsonify({
'success': True,
'data': {
'certificate': ssl_data,
'proxy_host': update_data
}
})
except Exception as e:
current_app.logger.error(f"Error in create_ssl_certificate: {str(e)}")
return jsonify({
'success': False,
'error': str(e)
}), 500
@launch_api.route('/download-docker-compose', methods=['POST'])
@csrf.exempt
def download_docker_compose():
"""Download docker-compose.yml from the repository"""
data = request.get_json()
if not data or 'repository' not in data or 'branch' not in data:
return jsonify({'message': 'Missing required fields'}), 400
try:
# Get Git settings
git_settings = KeyValueSettings.get_value('git_settings')
if not git_settings:
return jsonify({'message': 'Git settings not configured'}), 400
# Determine the provider and set up the appropriate API call
if git_settings['provider'] == 'gitea':
# For Gitea
headers = {
'Accept': 'application/json',
'Authorization': f'token {git_settings["token"]}'
}
# Try to get the file content
response = requests.get(
f'{git_settings["url"]}/api/v1/repos/{data["repository"]}/contents/docker-compose.yml',
headers=headers,
params={'ref': data['branch']}
)
if response.status_code != 200:
# Try token as query parameter if header auth fails
response = requests.get(
f'{git_settings["url"]}/api/v1/repos/{data["repository"]}/contents/docker-compose.yml?token={git_settings["token"]}',
headers={'Accept': 'application/json'},
params={'ref': data['branch']}
)
elif git_settings['provider'] == 'gitlab':
# For GitLab
headers = {
'PRIVATE-TOKEN': git_settings['token'],
'Accept': 'application/json'
}
# Get the file content
response = requests.get(
f'{git_settings["url"]}/api/v4/projects/{data["repository"].replace("/", "%2F")}/repository/files/docker-compose.yml/raw',
headers=headers,
params={'ref': data['branch']}
)
else:
return jsonify({'message': 'Unsupported Git provider'}), 400
if response.status_code == 200:
# For Gitea, we need to decode the content from base64
if git_settings['provider'] == 'gitea':
content = base64.b64decode(response.json()['content']).decode('utf-8')
else:
content = response.text
return jsonify({
'success': True,
'content': content
})
else:
return jsonify({
'message': f'Failed to download docker-compose.yml: {response.json().get("message", "Unknown error")}'
}), 400
except Exception as e:
current_app.logger.error(f"Error downloading docker-compose.yml: {str(e)}")
return jsonify({'message': f'Error downloading docker-compose.yml: {str(e)}'}), 500
@launch_api.route('/deploy-stack', methods=['POST'])
@csrf.exempt
def deploy_stack():
try:
data = request.get_json()
if not data or 'name' not in data or 'StackFileContent' not in data:
return jsonify({'error': 'Missing required fields'}), 400
# Get Portainer settings
portainer_settings = KeyValueSettings.get_value('portainer_settings')
if not portainer_settings:
return jsonify({'error': 'Portainer settings not configured'}), 400
# Verify Portainer authentication
auth_response = requests.get(
f"{portainer_settings['url'].rstrip('/')}/api/status",
headers={
'X-API-Key': portainer_settings['api_key'],
'Accept': 'application/json'
},
timeout=30 # 30 seconds timeout for status check
)
if not auth_response.ok:
current_app.logger.error(f"Portainer authentication failed: {auth_response.text}")
return jsonify({'error': 'Failed to authenticate with Portainer'}), 401
# Get Portainer endpoint ID (assuming it's the first endpoint)
endpoint_response = requests.get(
f"{portainer_settings['url'].rstrip('/')}/api/endpoints",
headers={
'X-API-Key': portainer_settings['api_key'],
'Accept': 'application/json'
},
timeout=30 # 30 seconds timeout for endpoint check
)
if not endpoint_response.ok:
error_text = endpoint_response.text
try:
error_json = endpoint_response.json()
error_text = error_json.get('message', error_text)
except:
pass
return jsonify({'error': f'Failed to get Portainer endpoints: {error_text}'}), 500
endpoints = endpoint_response.json()
if not endpoints:
return jsonify({'error': 'No Portainer endpoints found'}), 400
endpoint_id = endpoints[0]['Id']
# Log the request data
current_app.logger.info(f"Creating stack with data: {json.dumps(data)}")
current_app.logger.info(f"Using endpoint ID: {endpoint_id}")
# First, check if a stack with this name already exists
stacks_url = f"{portainer_settings['url'].rstrip('/')}/api/stacks"
stacks_response = requests.get(
stacks_url,
headers={
'X-API-Key': portainer_settings['api_key'],
'Accept': 'application/json'
},
params={'filters': json.dumps({'Name': data['name']})},
timeout=30
)
if stacks_response.ok:
existing_stacks = stacks_response.json()
for stack in existing_stacks:
if stack['Name'] == data['name']:
current_app.logger.info(f"Found existing stack: {stack['Name']} (ID: {stack['Id']})")
return jsonify({
'name': stack['Name'],
'id': stack['Id'],
'status': 'existing'
})
# If no existing stack found, proceed with creation
url = f"{portainer_settings['url'].rstrip('/')}/api/stacks/create/standalone/string"
current_app.logger.info(f"Making request to: {url}")
# Prepare the request body according to Portainer's API spec
request_body = data
# Add endpointId as a query parameter
params = {'endpointId': endpoint_id}
# Set a longer timeout for stack creation (10 minutes)
create_response = requests.post(
url,
headers={
'X-API-Key': portainer_settings['api_key'],
'Content-Type': 'application/json',
'Accept': 'application/json'
},
params=params,
json=request_body,
timeout=600 # 10 minutes timeout for stack creation
)
# Log the response details
current_app.logger.info(f"Response status: {create_response.status_code}")
current_app.logger.info(f"Response headers: {dict(create_response.headers)}")
response_text = create_response.text
current_app.logger.info(f"Response body: {response_text}")
if not create_response.ok:
error_message = response_text
try:
error_json = create_response.json()
error_message = error_json.get('message', error_message)
except:
pass
return jsonify({'error': f'Failed to create stack: {error_message}'}), 500
stack_info = create_response.json()
return jsonify({
'name': stack_info['Name'],
'id': stack_info['Id'],
'status': 'created'
})
except requests.exceptions.Timeout:
current_app.logger.error("Request timed out while deploying stack")
return jsonify({'error': 'Request timed out while deploying stack. The operation may still be in progress.'}), 504
except Exception as e:
current_app.logger.error(f"Error deploying stack: {str(e)}")
return jsonify({'error': str(e)}), 500
@launch_api.route('/save-instance', methods=['POST'])
@csrf.exempt
def save_instance():
try:
if not request.is_json:
return jsonify({'error': 'Request must be JSON'}), 400
data = request.get_json()
required_fields = ['name', 'port', 'domains', 'stack_id', 'stack_name', 'status', 'repository', 'branch']
if not all(field in data for field in required_fields):
missing_fields = [field for field in required_fields if field not in data]
return jsonify({'error': f'Missing required fields: {", ".join(missing_fields)}'}), 400
# Save instance data
instance_data = {
'name': data['name'],
'port': data['port'],
'domains': data['domains'],
'stack_id': data['stack_id'],
'stack_name': data['stack_name'],
'status': data['status'],
'repository': data['repository'],
'branch': data['branch'],
'created_at': datetime.utcnow().isoformat()
}
# Save to database using KeyValueSettings
KeyValueSettings.set_value(f'instance_{data["name"]}', instance_data)
return jsonify({
'message': 'Instance data saved successfully',
'data': instance_data
})
except Exception as e:
current_app.logger.error(f"Error saving instance data: {str(e)}")
return jsonify({'error': str(e)}), 500
@launch_api.route('/apply-company-information', methods=['POST'])
@csrf.exempt
def apply_company_information():
"""Apply company information to a launched instance"""
try:
if not request.is_json:
return jsonify({'error': 'Request must be JSON'}), 400
data = request.get_json()
required_fields = ['instance_url', 'company_data']
if not all(field in data for field in required_fields):
missing_fields = [field for field in required_fields if field not in data]
return jsonify({'error': f'Missing required fields: {", ".join(missing_fields)}'}), 400
instance_url = data['instance_url']
company_data = data['company_data']
# Get the instance from database to get the connection token
instance = Instance.query.filter_by(main_url=instance_url).first()
if not instance:
return jsonify({'error': 'Instance not found in database'}), 404
if not instance.connection_token:
return jsonify({'error': 'Instance not authenticated'}), 400
# First get JWT token from the instance
jwt_response = requests.post(
f"{instance_url.rstrip('/')}/api/admin/management-token",
headers={
'X-API-Key': instance.connection_token,
'Accept': 'application/json'
},
timeout=10
)
if jwt_response.status_code != 200:
return jsonify({'error': f'Failed to get JWT token: {jwt_response.text}'}), 400
jwt_data = jwt_response.json()
jwt_token = jwt_data.get('token')
if not jwt_token:
return jsonify({'error': 'No JWT token received'}), 400
# Prepare company data for the API
api_company_data = {
'company_name': company_data.get('name'),
'company_industry': company_data.get('industry'),
'company_email': company_data.get('email'),
'company_website': company_data.get('website'),
'company_address': company_data.get('streetAddress'),
'company_city': company_data.get('city'),
'company_state': company_data.get('state'),
'company_zip': company_data.get('zipCode'),
'company_country': company_data.get('country'),
'company_description': company_data.get('description'),
'company_phone': company_data.get('phone')
}
# Apply company information to the instance
company_response = requests.put(
f"{instance_url.rstrip('/')}/api/admin/settings",
headers={
'Authorization': f'Bearer {jwt_token}',
'Content-Type': 'application/json',
'Accept': 'application/json'
},
json=api_company_data,
timeout=10
)
if company_response.status_code != 200:
return jsonify({'error': f'Failed to apply company information: {company_response.text}'}), 400
# Update the instance company name in our database
instance.company = company_data.get('name', 'Unknown')
db.session.commit()
return jsonify({
'message': 'Company information applied successfully',
'data': {
'company_name': company_data.get('name'),
'instance_url': instance_url
}
})
except Exception as e:
current_app.logger.error(f"Error applying company information: {str(e)}")
return jsonify({'error': str(e)}), 500
@launch_api.route('/apply-colors', methods=['POST'])
@csrf.exempt
def apply_colors():
"""Apply colors to a launched instance"""
try:
if not request.is_json:
return jsonify({'error': 'Request must be JSON'}), 400
data = request.get_json()
required_fields = ['instance_url', 'colors_data']
if not all(field in data for field in required_fields):
missing_fields = [field for field in required_fields if field not in data]
return jsonify({'error': f'Missing required fields: {", ".join(missing_fields)}'}), 400
instance_url = data['instance_url']
colors_data = data['colors_data']
# Get the instance from database to get the connection token
instance = Instance.query.filter_by(main_url=instance_url).first()
if not instance:
return jsonify({'error': 'Instance not found in database'}), 404
if not instance.connection_token:
return jsonify({'error': 'Instance not authenticated'}), 400
# First get JWT token from the instance
jwt_response = requests.post(
f"{instance_url.rstrip('/')}/api/admin/management-token",
headers={
'X-API-Key': instance.connection_token,
'Accept': 'application/json'
},
timeout=10
)
if jwt_response.status_code != 200:
return jsonify({'error': f'Failed to get JWT token: {jwt_response.text}'}), 400
jwt_data = jwt_response.json()
jwt_token = jwt_data.get('token')
if not jwt_token:
return jsonify({'error': 'No JWT token received'}), 400
# Prepare colors data for the API
api_colors_data = {
'primary_color': colors_data.get('primary'),
'secondary_color': colors_data.get('secondary')
}
# Apply colors to the instance
colors_response = requests.put(
f"{instance_url.rstrip('/')}/api/admin/settings",
headers={
'Authorization': f'Bearer {jwt_token}',
'Content-Type': 'application/json',
'Accept': 'application/json'
},
json=api_colors_data,
timeout=10
)
if colors_response.status_code != 200:
return jsonify({'error': f'Failed to apply colors: {colors_response.text}'}), 400
return jsonify({
'message': 'Colors applied successfully',
'data': {
'primary_color': colors_data.get('primary'),
'secondary_color': colors_data.get('secondary'),
'instance_url': instance_url
}
})
except Exception as e:
current_app.logger.error(f"Error applying colors: {str(e)}")
return jsonify({'error': str(e)}), 500
@launch_api.route('/update-admin-credentials', methods=['POST'])
@csrf.exempt
def update_admin_credentials():
"""Update admin credentials on a launched instance"""
try:
if not request.is_json:
return jsonify({'error': 'Request must be JSON'}), 400
data = request.get_json()
required_fields = ['instance_url', 'email']
if not all(field in data for field in required_fields):
missing_fields = [field for field in required_fields if field not in data]
return jsonify({'error': f'Missing required fields: {", ".join(missing_fields)}'}), 400
instance_url = data['instance_url']
email = data['email']
# Get the instance from database to get the connection token
instance = Instance.query.filter_by(main_url=instance_url).first()
if not instance:
return jsonify({'error': 'Instance not found in database'}), 404
if not instance.connection_token:
return jsonify({'error': 'Instance not authenticated'}), 400
# First get JWT token from the instance
jwt_response = requests.post(
f"{instance_url.rstrip('/')}/api/admin/management-token",
headers={
'X-API-Key': instance.connection_token,
'Accept': 'application/json'
},
timeout=10
)
if jwt_response.status_code != 200:
return jsonify({'error': f'Failed to get JWT token: {jwt_response.text}'}), 400
jwt_data = jwt_response.json()
jwt_token = jwt_data.get('token')
if not jwt_token:
return jsonify({'error': 'No JWT token received'}), 400
# Generate a secure password
import secrets
import string
alphabet = string.ascii_letters + string.digits + "!@#$%^&*"
new_password = ''.join(secrets.choice(alphabet) for i in range(16))
# First, login with default credentials to get admin token
login_response = requests.post(
f"{instance_url.rstrip('/')}/api/admin/login",
headers={
'Content-Type': 'application/json',
'Accept': 'application/json'
},
json={
'email': 'administrator@docupulse.com',
'password': 'changeme'
},
timeout=10
)
if login_response.status_code != 200:
return jsonify({'error': f'Failed to login with default credentials: {login_response.text}'}), 400
login_data = login_response.json()
if login_data.get('status') != 'success' or not login_data.get('token'):
return jsonify({'error': 'Failed to get admin token'}), 400
admin_token = login_data.get('token')
# Get the admin user ID first
users_response = requests.get(
f"{instance_url.rstrip('/')}/api/admin/contacts",
headers={
'Authorization': f'Bearer {admin_token}',
'Accept': 'application/json'
},
timeout=10
)
if users_response.status_code != 200:
return jsonify({'error': f'Failed to get users: {users_response.text}'}), 400
users_data = users_response.json()
admin_user = None
# Find the administrator user
for user in users_data:
if user.get('email') == 'administrator@docupulse.com':
admin_user = user
break
if not admin_user:
return jsonify({'error': 'Administrator user not found'}), 404
admin_user_id = admin_user.get('id')
# Update the admin user with new email and password
update_response = requests.put(
f"{instance_url.rstrip('/')}/api/admin/contacts/{admin_user_id}",
headers={
'Authorization': f'Bearer {admin_token}',
'Content-Type': 'application/json',
'Accept': 'application/json'
},
json={
'email': email,
'password': new_password,
'username': 'administrator',
'last_name': 'Administrator',
'role': 'admin'
},
timeout=10
)
if update_response.status_code != 200:
return jsonify({'error': f'Failed to update admin credentials: {update_response.text}'}), 400
return jsonify({
'message': 'Admin credentials updated successfully',
'data': {
'email': email,
'password': new_password,
'username': 'administrator',
'instance_url': instance_url
}
})
except Exception as e:
current_app.logger.error(f"Error updating admin credentials: {str(e)}")
return jsonify({'error': str(e)}), 500