1158 lines
39 KiB
Python
1158 lines
39 KiB
Python
from flask import Blueprint, jsonify, request, current_app, make_response, flash, redirect, url_for
|
|
from functools import wraps
|
|
from models import (
|
|
KeyValueSettings, User, Room, Conversation, RoomFile,
|
|
SiteSettings, DocuPulseSettings, Event, Mail, ManagementAPIKey
|
|
)
|
|
from extensions import db, csrf
|
|
from datetime import datetime, timedelta
|
|
import os
|
|
import jwt
|
|
from werkzeug.security import generate_password_hash
|
|
import secrets
|
|
from flask_login import login_user
|
|
import requests
|
|
import json
|
|
|
|
admin_api = Blueprint('admin_api', __name__)
|
|
|
|
def add_cors_headers(response):
|
|
"""Add CORS headers to the response"""
|
|
response.headers['Access-Control-Allow-Origin'] = '*'
|
|
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, DELETE, OPTIONS'
|
|
response.headers['Access-Control-Allow-Headers'] = 'Content-Type, Authorization, X-API-Key, X-CSRF-Token'
|
|
return response
|
|
|
|
@admin_api.before_request
|
|
def handle_preflight():
|
|
"""Handle preflight requests"""
|
|
if request.method == 'OPTIONS':
|
|
response = make_response()
|
|
return add_cors_headers(response)
|
|
|
|
@admin_api.after_request
|
|
def after_request(response):
|
|
"""Add CORS headers to all responses"""
|
|
return add_cors_headers(response)
|
|
|
|
def token_required(f):
|
|
@wraps(f)
|
|
def decorated(*args, **kwargs):
|
|
token = None
|
|
if 'Authorization' in request.headers:
|
|
token = request.headers['Authorization'].split(" ")[1]
|
|
|
|
if not token:
|
|
return jsonify({'message': 'Token is missing!'}), 401
|
|
|
|
try:
|
|
data = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=["HS256"])
|
|
# Check if it's a management tool token
|
|
if data.get('is_management'):
|
|
return f(None, *args, **kwargs) # Pass None as current_user for management tool
|
|
|
|
current_user = User.query.get(data['user_id'])
|
|
if not current_user or not current_user.is_admin:
|
|
return jsonify({'message': 'Invalid token or insufficient permissions!'}), 401
|
|
except:
|
|
return jsonify({'message': 'Invalid token!'}), 401
|
|
|
|
return f(current_user, *args, **kwargs)
|
|
return decorated
|
|
|
|
def generate_management_api_key():
|
|
"""Generate a secure API key for the management tool"""
|
|
return secrets.token_urlsafe(32)
|
|
|
|
def validate_management_api_key(api_key):
|
|
"""Validate if the provided API key is valid"""
|
|
key = ManagementAPIKey.query.filter_by(api_key=api_key, is_active=True).first()
|
|
if key:
|
|
key.last_used_at = datetime.utcnow()
|
|
db.session.commit()
|
|
return True
|
|
return False
|
|
|
|
@admin_api.route('/login', methods=['POST'])
|
|
@csrf.exempt
|
|
def admin_login():
|
|
try:
|
|
# Check if this is an API request
|
|
is_api_request = request.headers.get('Accept') == 'application/json' or \
|
|
request.headers.get('Content-Type') == 'application/json'
|
|
|
|
if is_api_request:
|
|
data = request.get_json()
|
|
else:
|
|
data = request.form
|
|
|
|
if not data or 'email' not in data or 'password' not in data:
|
|
if is_api_request:
|
|
return jsonify({
|
|
'message': 'Email and password are required',
|
|
'status': 'error'
|
|
}), 400
|
|
flash('Email and password are required', 'error')
|
|
return redirect(url_for('auth.login'))
|
|
|
|
user = User.query.filter_by(email=data['email']).first()
|
|
if not user or not user.is_admin or not user.check_password(data['password']):
|
|
if is_api_request:
|
|
return jsonify({
|
|
'message': 'Invalid credentials or not an admin',
|
|
'status': 'error'
|
|
}), 401
|
|
flash('Invalid credentials or not an admin', 'error')
|
|
return redirect(url_for('auth.login'))
|
|
|
|
# For API requests, return JWT token
|
|
if is_api_request:
|
|
token = jwt.encode({
|
|
'user_id': user.id,
|
|
'is_admin': True,
|
|
'exp': datetime.utcnow() + timedelta(days=1) # Token expires in 1 day
|
|
}, current_app.config['SECRET_KEY'], algorithm="HS256")
|
|
|
|
return jsonify({
|
|
'token': token,
|
|
'status': 'success'
|
|
}), 200
|
|
|
|
# For web requests, use session-based auth
|
|
login_user(user)
|
|
return redirect(url_for('main.dashboard'))
|
|
|
|
except Exception as e:
|
|
current_app.logger.error(f"Login error: {str(e)}")
|
|
if is_api_request:
|
|
return jsonify({
|
|
'message': 'An error occurred during login',
|
|
'status': 'error'
|
|
}), 500
|
|
flash('An error occurred during login', 'error')
|
|
return redirect(url_for('auth.login'))
|
|
|
|
@admin_api.route('/management-token', methods=['POST'])
|
|
@csrf.exempt
|
|
def get_management_token():
|
|
"""Generate a JWT token for the management tool using API key authentication"""
|
|
api_key = request.headers.get('X-API-Key')
|
|
if not api_key or not validate_management_api_key(api_key):
|
|
return jsonify({'message': 'Invalid API key'}), 401
|
|
|
|
# Create a token without expiration
|
|
token = jwt.encode({
|
|
'user_id': 0, # Special user ID for management tool
|
|
'is_management': True
|
|
}, current_app.config['SECRET_KEY'], algorithm="HS256")
|
|
|
|
return jsonify({
|
|
'token': token
|
|
})
|
|
|
|
@admin_api.route('/management-api-key', methods=['POST'])
|
|
@csrf.exempt
|
|
@token_required
|
|
def create_management_api_key(current_user):
|
|
"""Create a new API key for the management tool (only accessible by admin users)"""
|
|
if not current_user.is_admin:
|
|
return jsonify({'message': 'Insufficient permissions'}), 403
|
|
|
|
data = request.get_json()
|
|
if not data or 'name' not in data:
|
|
return jsonify({'message': 'Name is required'}), 400
|
|
|
|
api_key = generate_management_api_key()
|
|
key = ManagementAPIKey(
|
|
api_key=api_key,
|
|
name=data['name'],
|
|
created_by=current_user.id
|
|
)
|
|
db.session.add(key)
|
|
db.session.commit()
|
|
|
|
return jsonify({
|
|
'api_key': api_key,
|
|
'name': key.name,
|
|
'created_at': key.created_at.isoformat(),
|
|
'message': 'API key generated successfully. Store this key securely as it will not be shown again.'
|
|
}), 201
|
|
|
|
@admin_api.route('/management-api-keys', methods=['GET'])
|
|
@csrf.exempt
|
|
@token_required
|
|
def list_management_api_keys(current_user):
|
|
"""List all management API keys (only accessible by admin users)"""
|
|
if not current_user.is_admin:
|
|
return jsonify({'message': 'Insufficient permissions'}), 403
|
|
|
|
keys = ManagementAPIKey.query.all()
|
|
return jsonify([{
|
|
'id': key.id,
|
|
'name': key.name,
|
|
'created_at': key.created_at.isoformat(),
|
|
'last_used_at': key.last_used_at.isoformat() if key.last_used_at else None,
|
|
'is_active': key.is_active,
|
|
'created_by': key.created_by
|
|
} for key in keys])
|
|
|
|
@admin_api.route('/management-api-key/<int:key_id>', methods=['DELETE'])
|
|
@csrf.exempt
|
|
@token_required
|
|
def revoke_management_api_key(current_user, key_id):
|
|
"""Revoke a management API key (only accessible by admin users)"""
|
|
if not current_user.is_admin:
|
|
return jsonify({'message': 'Insufficient permissions'}), 403
|
|
|
|
key = ManagementAPIKey.query.get(key_id)
|
|
if not key:
|
|
return jsonify({'message': 'API key not found'}), 404
|
|
|
|
key.is_active = False
|
|
db.session.commit()
|
|
return jsonify({'message': 'API key revoked successfully'})
|
|
|
|
# Key-Value Settings CRUD
|
|
@admin_api.route('/key-value', methods=['GET'])
|
|
@csrf.exempt
|
|
@token_required
|
|
def get_key_values(current_user):
|
|
settings = KeyValueSettings.query.all()
|
|
return jsonify([{'key': s.key, 'value': s.value} for s in settings])
|
|
|
|
@admin_api.route('/key-value/<key>', methods=['GET'])
|
|
@token_required
|
|
def get_key_value(current_user, key):
|
|
setting = KeyValueSettings.query.filter_by(key=key).first()
|
|
if not setting:
|
|
return jsonify({'message': 'Key not found'}), 404
|
|
return jsonify({'key': setting.key, 'value': setting.value})
|
|
|
|
@admin_api.route('/key-value', methods=['POST'])
|
|
@csrf.exempt
|
|
@token_required
|
|
def create_key_value(current_user):
|
|
data = request.get_json()
|
|
if not data or 'key' not in data or 'value' not in data:
|
|
return jsonify({'message': 'Missing key or value'}), 400
|
|
|
|
setting = KeyValueSettings(key=data['key'], value=data['value'])
|
|
db.session.add(setting)
|
|
db.session.commit()
|
|
return jsonify({'message': 'Key-value pair created successfully'}), 201
|
|
|
|
@admin_api.route('/key-value/<key>', methods=['PUT'])
|
|
@csrf.exempt
|
|
@token_required
|
|
def update_key_value(current_user, key):
|
|
setting = KeyValueSettings.query.filter_by(key=key).first()
|
|
if not setting:
|
|
return jsonify({'message': 'Key not found'}), 404
|
|
|
|
data = request.get_json()
|
|
if not data or 'value' not in data:
|
|
return jsonify({'message': 'Missing value'}), 400
|
|
|
|
setting.value = data['value']
|
|
db.session.commit()
|
|
return jsonify({'message': 'Key-value pair updated successfully'})
|
|
|
|
@admin_api.route('/key-value/<key>', methods=['DELETE'])
|
|
@csrf.exempt
|
|
@token_required
|
|
def delete_key_value(current_user, key):
|
|
setting = KeyValueSettings.query.filter_by(key=key).first()
|
|
if not setting:
|
|
return jsonify({'message': 'Key not found'}), 404
|
|
|
|
db.session.delete(setting)
|
|
db.session.commit()
|
|
return jsonify({'message': 'Key-value pair deleted successfully'})
|
|
|
|
# Contacts (Users) CRUD
|
|
@admin_api.route('/contacts', methods=['GET'])
|
|
@csrf.exempt
|
|
@token_required
|
|
def get_contacts(current_user):
|
|
users = User.query.all()
|
|
return jsonify([{
|
|
'id': user.id,
|
|
'username': user.username,
|
|
'email': user.email,
|
|
'last_name': user.last_name,
|
|
'phone': user.phone,
|
|
'company': user.company,
|
|
'position': user.position,
|
|
'is_active': user.is_active,
|
|
'is_admin': user.is_admin,
|
|
'is_manager': user.is_manager,
|
|
'role': 'admin' if user.is_admin else 'manager' if user.is_manager else 'user',
|
|
'created_at': user.created_at.isoformat()
|
|
} for user in users])
|
|
|
|
@admin_api.route('/contacts/<int:user_id>', methods=['GET'])
|
|
@csrf.exempt
|
|
@token_required
|
|
def get_contact(current_user, user_id):
|
|
user = User.query.get(user_id)
|
|
if not user:
|
|
return jsonify({'message': 'User not found'}), 404
|
|
return jsonify({
|
|
'id': user.id,
|
|
'username': user.username,
|
|
'email': user.email,
|
|
'last_name': user.last_name,
|
|
'phone': user.phone,
|
|
'company': user.company,
|
|
'position': user.position,
|
|
'is_active': user.is_active,
|
|
'is_admin': user.is_admin,
|
|
'is_manager': user.is_manager,
|
|
'role': 'admin' if user.is_admin else 'manager' if user.is_manager else 'user',
|
|
'created_at': user.created_at.isoformat()
|
|
})
|
|
|
|
@admin_api.route('/contacts', methods=['POST'])
|
|
@csrf.exempt
|
|
@token_required
|
|
def create_contact(current_user):
|
|
data = request.get_json()
|
|
required_fields = ['username', 'email', 'last_name', 'role']
|
|
if not all(field in data for field in required_fields):
|
|
return jsonify({'message': 'Missing required fields'}), 400
|
|
|
|
if User.query.filter_by(email=data['email']).first():
|
|
return jsonify({'message': 'Email already exists'}), 400
|
|
|
|
# Validate role
|
|
if data['role'] not in ['admin', 'manager', 'user']:
|
|
return jsonify({'message': 'Invalid role'}), 400
|
|
|
|
user = User(
|
|
username=data['username'],
|
|
email=data['email'],
|
|
last_name=data['last_name'],
|
|
phone=data.get('phone'),
|
|
company=data.get('company'),
|
|
position=data.get('position'),
|
|
is_admin=data['role'] == 'admin',
|
|
is_manager=data['role'] == 'manager'
|
|
)
|
|
user.set_password(data.get('password', 'changeme'))
|
|
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
return jsonify({'message': 'Contact created successfully', 'id': user.id}), 201
|
|
|
|
@admin_api.route('/contacts/<int:user_id>', methods=['PUT'])
|
|
@csrf.exempt
|
|
@token_required
|
|
def update_contact(current_user, user_id):
|
|
user = User.query.get(user_id)
|
|
if not user:
|
|
return jsonify({'message': 'User not found'}), 404
|
|
|
|
data = request.get_json()
|
|
if 'email' in data and data['email'] != user.email:
|
|
if User.query.filter_by(email=data['email']).first():
|
|
return jsonify({'message': 'Email already exists'}), 400
|
|
user.email = data['email']
|
|
|
|
# Update role if provided
|
|
if 'role' in data:
|
|
if data['role'] not in ['admin', 'manager', 'user']:
|
|
return jsonify({'message': 'Invalid role'}), 400
|
|
user.is_admin = data['role'] == 'admin'
|
|
user.is_manager = data['role'] == 'manager'
|
|
|
|
user.username = data.get('username', user.username)
|
|
user.last_name = data.get('last_name', user.last_name)
|
|
user.phone = data.get('phone', user.phone)
|
|
user.company = data.get('company', user.company)
|
|
user.position = data.get('position', user.position)
|
|
user.is_active = data.get('is_active', user.is_active)
|
|
|
|
if 'password' in data:
|
|
user.set_password(data['password'])
|
|
|
|
db.session.commit()
|
|
return jsonify({'message': 'Contact updated successfully'})
|
|
|
|
@admin_api.route('/contacts/<int:user_id>', methods=['DELETE'])
|
|
@csrf.exempt
|
|
@token_required
|
|
def delete_contact(current_user, user_id):
|
|
user = User.query.get(user_id)
|
|
if not user:
|
|
return jsonify({'message': 'User not found'}), 404
|
|
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
return jsonify({'message': 'Contact deleted successfully'})
|
|
|
|
# Statistics
|
|
@admin_api.route('/statistics', methods=['GET'])
|
|
@csrf.exempt
|
|
@token_required
|
|
def get_statistics(current_user):
|
|
room_count = Room.query.count()
|
|
conversation_count = Conversation.query.count()
|
|
|
|
# Calculate total storage
|
|
total_storage = 0
|
|
for file in RoomFile.query.all():
|
|
if file.size:
|
|
total_storage += file.size
|
|
|
|
return jsonify({
|
|
'rooms': room_count,
|
|
'conversations': conversation_count,
|
|
'total_storage': total_storage
|
|
})
|
|
|
|
# Website Settings CRUD
|
|
@admin_api.route('/settings', methods=['GET'])
|
|
@csrf.exempt
|
|
@token_required
|
|
def get_settings(current_user):
|
|
settings = SiteSettings.get_settings()
|
|
return jsonify({
|
|
'primary_color': settings.primary_color,
|
|
'secondary_color': settings.secondary_color,
|
|
'company_name': settings.company_name,
|
|
'company_logo': settings.company_logo,
|
|
'company_website': settings.company_website,
|
|
'company_email': settings.company_email,
|
|
'company_phone': settings.company_phone,
|
|
'company_address': settings.company_address,
|
|
'company_city': settings.company_city,
|
|
'company_state': settings.company_state,
|
|
'company_zip': settings.company_zip,
|
|
'company_country': settings.company_country,
|
|
'company_description': settings.company_description,
|
|
'company_industry': settings.company_industry
|
|
})
|
|
|
|
@admin_api.route('/settings', methods=['PUT'])
|
|
@csrf.exempt
|
|
@token_required
|
|
def update_settings(current_user):
|
|
settings = SiteSettings.get_settings()
|
|
data = request.get_json()
|
|
|
|
for key, value in data.items():
|
|
if hasattr(settings, key):
|
|
setattr(settings, key, value)
|
|
|
|
db.session.commit()
|
|
return jsonify({'message': 'Settings updated successfully'})
|
|
|
|
# Website Logs
|
|
@admin_api.route('/logs', methods=['GET'])
|
|
@csrf.exempt
|
|
@token_required
|
|
def get_logs(current_user):
|
|
page = request.args.get('page', 1, type=int)
|
|
per_page = request.args.get('per_page', 50, type=int)
|
|
|
|
events = Event.query.order_by(Event.timestamp.desc()).paginate(
|
|
page=page, per_page=per_page, error_out=False
|
|
)
|
|
|
|
return jsonify({
|
|
'events': [{
|
|
'id': event.id,
|
|
'event_type': event.event_type,
|
|
'user_id': event.user_id,
|
|
'timestamp': event.timestamp.isoformat(),
|
|
'details': event.details,
|
|
'ip_address': event.ip_address,
|
|
'user_agent': event.user_agent
|
|
} for event in events.items],
|
|
'total': events.total,
|
|
'pages': events.pages,
|
|
'current_page': events.page
|
|
})
|
|
|
|
# Mail Logs
|
|
@admin_api.route('/mail-logs', methods=['GET'])
|
|
@csrf.exempt
|
|
@token_required
|
|
def get_mail_logs(current_user):
|
|
page = request.args.get('page', 1, type=int)
|
|
per_page = request.args.get('per_page', 50, type=int)
|
|
|
|
mails = Mail.query.order_by(Mail.created_at.desc()).paginate(
|
|
page=page, per_page=per_page, error_out=False
|
|
)
|
|
|
|
return jsonify({
|
|
'mails': [{
|
|
'id': mail.id,
|
|
'recipient': mail.recipient,
|
|
'subject': mail.subject,
|
|
'status': mail.status,
|
|
'created_at': mail.created_at.isoformat(),
|
|
'sent_at': mail.sent_at.isoformat() if mail.sent_at else None,
|
|
'template_id': mail.template_id
|
|
} for mail in mails.items],
|
|
'total': mails.total,
|
|
'pages': mails.pages,
|
|
'current_page': mails.page
|
|
})
|
|
|
|
# Resend Setup Mail
|
|
@admin_api.route('/resend-setup-mail/<int:user_id>', methods=['POST'])
|
|
@csrf.exempt
|
|
@token_required
|
|
def resend_setup_mail(current_user, user_id):
|
|
user = User.query.get(user_id)
|
|
if not user:
|
|
return jsonify({'message': 'User not found'}), 404
|
|
|
|
# Generate a new password setup token
|
|
token = PasswordSetupToken(
|
|
user_id=user.id,
|
|
token=generate_password_hash(str(user.id) + str(datetime.utcnow())),
|
|
expires_at=datetime.utcnow() + timedelta(days=7)
|
|
)
|
|
db.session.add(token)
|
|
|
|
# Create mail record
|
|
mail = Mail(
|
|
recipient=user.email,
|
|
subject='DocuPulse Account Setup',
|
|
body=f'Please click the following link to set up your account: {request.host_url}setup-password/{token.token}',
|
|
status='pending'
|
|
)
|
|
db.session.add(mail)
|
|
|
|
db.session.commit()
|
|
return jsonify({'message': 'Setup mail queued for resending'})
|
|
|
|
# Connection Settings
|
|
@admin_api.route('/test-portainer-connection', methods=['POST'])
|
|
@csrf.exempt
|
|
def test_portainer_connection():
|
|
data = request.get_json()
|
|
url = data.get('url')
|
|
api_key = data.get('api_key')
|
|
|
|
if not url or not api_key:
|
|
return jsonify({'error': 'Missing required fields'}), 400
|
|
|
|
try:
|
|
# Test Portainer connection
|
|
response = requests.get(
|
|
f"{url.rstrip('/')}/api/status",
|
|
headers={
|
|
'X-API-Key': api_key,
|
|
'Accept': 'application/json'
|
|
},
|
|
timeout=5
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
return jsonify({'message': 'Connection successful'})
|
|
else:
|
|
return jsonify({'error': 'Failed to connect to Portainer'}), 400
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
@admin_api.route('/test-nginx-connection', methods=['POST'])
|
|
@csrf.exempt
|
|
def test_nginx_connection():
|
|
data = request.get_json()
|
|
url = data.get('url')
|
|
username = data.get('username')
|
|
password = data.get('password')
|
|
|
|
if not url or not username or not password:
|
|
return jsonify({'error': 'Missing required fields'}), 400
|
|
|
|
try:
|
|
# First, get the JWT token
|
|
token_response = requests.post(
|
|
f"{url.rstrip('/')}/api/tokens",
|
|
json={
|
|
'identity': username,
|
|
'secret': password
|
|
},
|
|
headers={'Content-Type': 'application/json'},
|
|
timeout=5
|
|
)
|
|
|
|
if token_response.status_code != 200:
|
|
return jsonify({'error': 'Failed to authenticate with NGINX Proxy Manager'}), 400
|
|
|
|
token_data = token_response.json()
|
|
token = token_data.get('token')
|
|
|
|
if not token:
|
|
return jsonify({'error': 'No token received from NGINX Proxy Manager'}), 400
|
|
|
|
# Now test the connection using the token
|
|
response = requests.get(
|
|
f"{url.rstrip('/')}/api/nginx/proxy-hosts",
|
|
headers={
|
|
'Authorization': f'Bearer {token}',
|
|
'Accept': 'application/json'
|
|
},
|
|
timeout=5
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
return jsonify({'message': 'Connection successful'})
|
|
else:
|
|
return jsonify({'error': 'Failed to connect to NGINX Proxy Manager'}), 400
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
@admin_api.route('/save-portainer-connection', methods=['POST'])
|
|
@csrf.exempt
|
|
@token_required
|
|
def save_portainer_connection(current_user):
|
|
if not current_user.is_admin:
|
|
return jsonify({'error': 'Unauthorized'}), 403
|
|
|
|
data = request.get_json()
|
|
url = data.get('url')
|
|
api_key = data.get('api_key')
|
|
|
|
if not url or not api_key:
|
|
return jsonify({'error': 'Missing required fields'}), 400
|
|
|
|
try:
|
|
# Save Portainer settings
|
|
KeyValueSettings.set_value('portainer_settings', {
|
|
'url': url,
|
|
'api_key': api_key
|
|
})
|
|
|
|
return jsonify({'message': 'Settings saved successfully'})
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
@admin_api.route('/save-nginx-connection', methods=['POST'])
|
|
@csrf.exempt
|
|
@token_required
|
|
def save_nginx_connection(current_user):
|
|
if not current_user.is_admin:
|
|
return jsonify({'error': 'Unauthorized'}), 403
|
|
|
|
data = request.get_json()
|
|
url = data.get('url')
|
|
username = data.get('username')
|
|
password = data.get('password')
|
|
|
|
if not url or not username or not password:
|
|
return jsonify({'error': 'Missing required fields'}), 400
|
|
|
|
try:
|
|
# Save NGINX Proxy Manager settings
|
|
KeyValueSettings.set_value('nginx_settings', {
|
|
'url': url,
|
|
'username': username,
|
|
'password': password
|
|
})
|
|
|
|
return jsonify({'message': 'Settings saved successfully'})
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
@admin_api.route('/generate-gitea-token', methods=['POST'])
|
|
@csrf.exempt
|
|
def generate_gitea_token():
|
|
"""Generate a new Gitea API token"""
|
|
data = request.get_json()
|
|
if not data or 'url' not in data or 'username' not in data or 'password' not in data:
|
|
return jsonify({'message': 'Missing required fields'}), 400
|
|
|
|
try:
|
|
headers = {
|
|
'Content-Type': 'application/json'
|
|
}
|
|
|
|
if data.get('otp'):
|
|
headers['X-Gitea-OTP'] = data['otp']
|
|
|
|
# Generate token with required scopes
|
|
token_data = {
|
|
'name': f'docupulse_{datetime.utcnow().strftime("%Y%m%d_%H%M%S")}',
|
|
'scopes': [
|
|
'read:activitypub',
|
|
'read:issue',
|
|
'write:misc',
|
|
'read:notification',
|
|
'read:organization',
|
|
'read:package',
|
|
'read:repository',
|
|
'read:user'
|
|
]
|
|
}
|
|
|
|
# Make request to Gitea API
|
|
response = requests.post(
|
|
f'{data["url"]}/api/v1/users/{data["username"]}/tokens',
|
|
headers=headers,
|
|
json=token_data,
|
|
auth=(data['username'], data['password'])
|
|
)
|
|
|
|
if response.status_code == 201:
|
|
token_data = response.json()
|
|
return jsonify({
|
|
'token': token_data['sha1'],
|
|
'name': token_data['name'],
|
|
'token_last_eight': token_data['token_last_eight']
|
|
}), 200
|
|
else:
|
|
return jsonify({'message': f'Failed to generate token: {response.json().get("message", "Unknown error")}'}), 400
|
|
|
|
except Exception as e:
|
|
return jsonify({'message': f'Failed to generate token: {str(e)}'}), 400
|
|
|
|
@admin_api.route('/list-gitea-repos', methods=['POST'])
|
|
@csrf.exempt
|
|
def list_gitea_repos():
|
|
"""List repositories from Gitea"""
|
|
data = request.get_json()
|
|
if not data or 'url' not in data or 'token' not in data:
|
|
return jsonify({'message': 'Missing required fields'}), 400
|
|
|
|
try:
|
|
# Try different authentication methods
|
|
headers = {
|
|
'Accept': 'application/json'
|
|
}
|
|
|
|
# First try token in Authorization header
|
|
headers['Authorization'] = f'token {data["token"]}'
|
|
|
|
# Get user's repositories
|
|
response = requests.get(
|
|
f'{data["url"]}/api/v1/user/repos',
|
|
headers=headers
|
|
)
|
|
|
|
# If that fails, try token as query parameter
|
|
if response.status_code != 200:
|
|
response = requests.get(
|
|
f'{data["url"]}/api/v1/user/repos?token={data["token"]}',
|
|
headers={'Accept': 'application/json'}
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
return jsonify({
|
|
'repositories': response.json()
|
|
}), 200
|
|
else:
|
|
return jsonify({'message': f'Failed to list repositories: {response.json().get("message", "Unknown error")}'}), 400
|
|
|
|
except Exception as e:
|
|
return jsonify({'message': f'Failed to list repositories: {str(e)}'}), 400
|
|
|
|
@admin_api.route('/list-gitea-branches', methods=['POST'])
|
|
@csrf.exempt
|
|
def list_gitea_branches():
|
|
"""List branches from a Gitea repository"""
|
|
data = request.get_json()
|
|
if not data or 'url' not in data or 'token' not in data or 'repo' not in data:
|
|
return jsonify({'message': 'Missing required fields'}), 400
|
|
|
|
try:
|
|
# Try different authentication methods
|
|
headers = {
|
|
'Accept': 'application/json'
|
|
}
|
|
|
|
# First try token in Authorization header
|
|
headers['Authorization'] = f'token {data["token"]}'
|
|
|
|
# Get repository branches
|
|
response = requests.get(
|
|
f'{data["url"]}/api/v1/repos/{data["repo"]}/branches',
|
|
headers=headers
|
|
)
|
|
|
|
# If that fails, try token as query parameter
|
|
if response.status_code != 200:
|
|
response = requests.get(
|
|
f'{data["url"]}/api/v1/repos/{data["repo"]}/branches?token={data["token"]}',
|
|
headers={'Accept': 'application/json'}
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
return jsonify({
|
|
'branches': response.json()
|
|
}), 200
|
|
else:
|
|
return jsonify({'message': f'Failed to list branches: {response.json().get("message", "Unknown error")}'}), 400
|
|
|
|
except Exception as e:
|
|
return jsonify({'message': f'Failed to list branches: {str(e)}'}), 400
|
|
|
|
@admin_api.route('/list-gitlab-repos', methods=['POST'])
|
|
@csrf.exempt
|
|
def list_gitlab_repos():
|
|
"""List repositories from GitLab"""
|
|
data = request.get_json()
|
|
if not data or 'url' not in data or 'token' not in data:
|
|
return jsonify({'message': 'Missing required fields'}), 400
|
|
|
|
try:
|
|
headers = {
|
|
'PRIVATE-TOKEN': data['token'],
|
|
'Accept': 'application/json'
|
|
}
|
|
|
|
# Get user's projects (repositories)
|
|
response = requests.get(
|
|
f'{data["url"]}/api/v4/projects',
|
|
headers=headers,
|
|
params={'membership': 'true'} # Only get projects where user is a member
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
return jsonify({
|
|
'repositories': response.json()
|
|
}), 200
|
|
else:
|
|
return jsonify({'message': f'Failed to list repositories: {response.json().get("message", "Unknown error")}'}), 400
|
|
|
|
except Exception as e:
|
|
return jsonify({'message': f'Failed to list repositories: {str(e)}'}), 400
|
|
|
|
@admin_api.route('/test-git-connection', methods=['POST'])
|
|
@csrf.exempt
|
|
def test_git_connection():
|
|
"""Test the connection to a Git repository"""
|
|
data = request.get_json()
|
|
if not data or 'provider' not in data or 'url' not in data or 'username' not in data or 'token' not in data or 'repo' not in data:
|
|
return jsonify({'message': 'Missing required fields'}), 400
|
|
|
|
try:
|
|
if data['provider'] == 'gitea':
|
|
# Test Gitea connection with different authentication methods
|
|
headers = {
|
|
'Accept': 'application/json'
|
|
}
|
|
|
|
# First try token in Authorization header
|
|
headers['Authorization'] = f'token {data["token"]}'
|
|
|
|
# Try to get repository information
|
|
response = requests.get(
|
|
f'{data["url"]}/api/v1/repos/{data["repo"]}',
|
|
headers=headers
|
|
)
|
|
|
|
# If that fails, try token as query parameter
|
|
if response.status_code != 200:
|
|
response = requests.get(
|
|
f'{data["url"]}/api/v1/repos/{data["repo"]}?token={data["token"]}',
|
|
headers={'Accept': 'application/json'}
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
return jsonify({'message': 'Connection successful'}), 200
|
|
else:
|
|
return jsonify({'message': f'Connection failed: {response.json().get("message", "Unknown error")}'}), 400
|
|
|
|
elif data['provider'] == 'gitlab':
|
|
# Test GitLab connection
|
|
headers = {
|
|
'PRIVATE-TOKEN': data['token'],
|
|
'Accept': 'application/json'
|
|
}
|
|
|
|
# Try to get repository information
|
|
response = requests.get(
|
|
f'{data["url"]}/api/v4/projects/{data["repo"].replace("/", "%2F")}',
|
|
headers=headers
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
return jsonify({'message': 'Connection successful'}), 200
|
|
else:
|
|
return jsonify({'message': f'Connection failed: {response.json().get("message", "Unknown error")}'}), 400
|
|
|
|
else:
|
|
return jsonify({'message': 'Invalid Git provider'}), 400
|
|
|
|
except Exception as e:
|
|
return jsonify({'message': f'Connection failed: {str(e)}'}), 400
|
|
|
|
@admin_api.route('/save-git-connection', methods=['POST'])
|
|
@csrf.exempt
|
|
@token_required
|
|
def save_git_connection(current_user):
|
|
if not current_user.is_admin:
|
|
return jsonify({'error': 'Unauthorized'}), 403
|
|
|
|
data = request.get_json()
|
|
provider = data.get('provider')
|
|
url = data.get('url')
|
|
username = data.get('username')
|
|
token = data.get('token')
|
|
repo = data.get('repo')
|
|
|
|
if not provider or not url or not username or not token or not repo:
|
|
return jsonify({'error': 'Missing required fields'}), 400
|
|
|
|
if provider not in ['gitea', 'gitlab']:
|
|
return jsonify({'error': 'Invalid provider'}), 400
|
|
|
|
try:
|
|
# Save Git settings
|
|
KeyValueSettings.set_value('git_settings', {
|
|
'provider': provider,
|
|
'url': url,
|
|
'username': username,
|
|
'token': token,
|
|
'repo': repo
|
|
})
|
|
|
|
return jsonify({'message': 'Settings saved successfully'})
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
@admin_api.route('/create-proxy-host', methods=['POST'])
|
|
@csrf.exempt
|
|
@token_required
|
|
def create_proxy_host(current_user):
|
|
if not current_user.is_admin:
|
|
return jsonify({'error': 'Unauthorized'}), 403
|
|
|
|
data = request.get_json()
|
|
domains = data.get('domains')
|
|
scheme = data.get('scheme', 'http')
|
|
forward_ip = data.get('forward_ip')
|
|
forward_port = data.get('forward_port')
|
|
|
|
if not domains or not forward_ip or not forward_port:
|
|
return jsonify({'error': 'Missing required fields'}), 400
|
|
|
|
try:
|
|
# Get NGINX settings
|
|
nginx_settings = KeyValueSettings.get_value('nginx_settings')
|
|
if not nginx_settings:
|
|
return jsonify({'error': 'NGINX settings not configured'}), 400
|
|
|
|
# First, get the JWT token
|
|
token_response = requests.post(
|
|
f"{nginx_settings['url'].rstrip('/')}/api/tokens",
|
|
json={
|
|
'identity': nginx_settings['username'],
|
|
'secret': nginx_settings['password']
|
|
},
|
|
headers={'Content-Type': 'application/json'},
|
|
timeout=5
|
|
)
|
|
|
|
if token_response.status_code != 200:
|
|
return jsonify({'error': 'Failed to authenticate with NGINX Proxy Manager'}), 400
|
|
|
|
token_data = token_response.json()
|
|
token = token_data.get('token')
|
|
|
|
if not token:
|
|
return jsonify({'error': 'No token received from NGINX Proxy Manager'}), 400
|
|
|
|
# Create the proxy host
|
|
proxy_host_data = {
|
|
'domain_names': domains,
|
|
'forward_scheme': scheme,
|
|
'forward_host': forward_ip,
|
|
'forward_port': int(forward_port),
|
|
'ssl_forced': True,
|
|
'caching_enabled': True,
|
|
'block_exploits': True,
|
|
'allow_websocket_upgrade': True,
|
|
'http2_support': True,
|
|
'hsts_enabled': True,
|
|
'hsts_subdomains': True,
|
|
'meta': {
|
|
'letsencrypt_agree': True,
|
|
'dns_challenge': False
|
|
}
|
|
}
|
|
|
|
response = requests.post(
|
|
f"{nginx_settings['url'].rstrip('/')}/api/nginx/proxy-hosts",
|
|
json=proxy_host_data,
|
|
headers={
|
|
'Authorization': f'Bearer {token}',
|
|
'Content-Type': 'application/json',
|
|
'Accept': 'application/json'
|
|
},
|
|
timeout=5
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
return jsonify({
|
|
'message': 'Proxy host created successfully',
|
|
'data': response.json()
|
|
})
|
|
else:
|
|
error_data = response.json()
|
|
return jsonify({
|
|
'error': f'Failed to create proxy host: {error_data.get("message", "Unknown error")}'
|
|
}), 400
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
@admin_api.route('/create-ssl-certificate', methods=['POST'])
|
|
@csrf.exempt
|
|
@token_required
|
|
def create_ssl_certificate(current_user):
|
|
try:
|
|
data = request.get_json()
|
|
current_app.logger.info(f"Received request data: {data}")
|
|
|
|
domains = data.get('domains')
|
|
proxy_host_id = data.get('proxy_host_id')
|
|
nginx_url = data.get('nginx_url')
|
|
|
|
current_app.logger.info(f"Extracted data - domains: {domains}, proxy_host_id: {proxy_host_id}, nginx_url: {nginx_url}")
|
|
|
|
if not all([domains, proxy_host_id, nginx_url]):
|
|
missing_fields = []
|
|
if not domains: missing_fields.append('domains')
|
|
if not proxy_host_id: missing_fields.append('proxy_host_id')
|
|
if not nginx_url: missing_fields.append('nginx_url')
|
|
|
|
current_app.logger.error(f"Missing required fields: {missing_fields}")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Missing required fields: {", ".join(missing_fields)}'
|
|
}), 400
|
|
|
|
# Get NGINX settings
|
|
nginx_settings = KeyValueSettings.get_value('nginx_settings')
|
|
if not nginx_settings:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'NGINX settings not configured'
|
|
}), 400
|
|
|
|
# First, get the JWT token
|
|
token_response = requests.post(
|
|
f"{nginx_settings['url'].rstrip('/')}/api/tokens",
|
|
json={
|
|
'identity': nginx_settings['username'],
|
|
'secret': nginx_settings['password']
|
|
},
|
|
headers={'Content-Type': 'application/json'},
|
|
timeout=5
|
|
)
|
|
|
|
if token_response.status_code != 200:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Failed to authenticate with NGINX Proxy Manager'
|
|
}), 400
|
|
|
|
token_data = token_response.json()
|
|
token = token_data.get('token')
|
|
|
|
if not token:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'No token received from NGINX Proxy Manager'
|
|
}), 400
|
|
|
|
# Create the SSL certificate
|
|
ssl_request_data = {
|
|
'provider': 'letsencrypt',
|
|
'domain_names': domains,
|
|
'meta': {
|
|
'letsencrypt_agree': True,
|
|
'dns_challenge': False
|
|
}
|
|
}
|
|
current_app.logger.info(f"Making SSL certificate request to {nginx_url}/api/nginx/ssl with data: {ssl_request_data}")
|
|
|
|
ssl_response = requests.post(
|
|
f"{nginx_url}/api/nginx/ssl",
|
|
headers={
|
|
'Authorization': f'Bearer {token}',
|
|
'Content-Type': 'application/json',
|
|
'Accept': 'application/json'
|
|
},
|
|
json=ssl_request_data
|
|
)
|
|
|
|
current_app.logger.info(f"SSL certificate response status: {ssl_response.status_code}")
|
|
current_app.logger.info(f"SSL certificate response headers: {dict(ssl_response.headers)}")
|
|
|
|
if not ssl_response.ok:
|
|
error_text = ssl_response.text
|
|
current_app.logger.error(f"Failed to create SSL certificate: {error_text}")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Failed to create SSL certificate: {error_text}'
|
|
}), ssl_response.status_code
|
|
|
|
ssl_data = ssl_response.json()
|
|
current_app.logger.info(f"SSL certificate created successfully: {ssl_data}")
|
|
|
|
# Get the certificate ID
|
|
cert_id = ssl_data.get('id')
|
|
if not cert_id:
|
|
current_app.logger.error("No certificate ID received in response")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'No certificate ID received'
|
|
}), 500
|
|
|
|
# Update the proxy host with the certificate
|
|
update_request_data = {
|
|
'ssl_certificate_id': cert_id
|
|
}
|
|
current_app.logger.info(f"Updating proxy host {proxy_host_id} with data: {update_request_data}")
|
|
|
|
update_response = requests.put(
|
|
f"{nginx_url}/api/nginx/proxy-hosts/{proxy_host_id}",
|
|
headers={
|
|
'Authorization': f'Bearer {token}',
|
|
'Content-Type': 'application/json',
|
|
'Accept': 'application/json'
|
|
},
|
|
json=update_request_data
|
|
)
|
|
|
|
current_app.logger.info(f"Update response status: {update_response.status_code}")
|
|
current_app.logger.info(f"Update response headers: {dict(update_response.headers)}")
|
|
|
|
if not update_response.ok:
|
|
error_text = update_response.text
|
|
current_app.logger.error(f"Failed to update proxy host: {error_text}")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Failed to update proxy host: {error_text}'
|
|
}), update_response.status_code
|
|
|
|
update_data = update_response.json()
|
|
current_app.logger.info(f"Proxy host updated successfully: {update_data}")
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'data': {
|
|
'certificate': ssl_data,
|
|
'proxy_host': update_data
|
|
}
|
|
})
|
|
|
|
except Exception as e:
|
|
current_app.logger.error(f"Error in create_ssl_certificate: {str(e)}")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': str(e)
|
|
}), 500 |