from flask import Blueprint, render_template, redirect, url_for, flash, request, jsonify from flask_login import login_required, current_user from models import db, User, Notif from forms import UserForm from flask import abort from sqlalchemy import or_ from routes.auth import require_password_change from utils import log_event, create_notification, get_unread_count import json import os from werkzeug.utils import secure_filename from datetime import datetime contacts_bp = Blueprint('contacts', __name__, url_prefix='/contacts') UPLOAD_FOLDER = os.path.join(os.getcwd(), 'uploads', 'profile_pics') if not os.path.exists(UPLOAD_FOLDER): os.makedirs(UPLOAD_FOLDER) @contacts_bp.context_processor def inject_unread_notifications(): if current_user.is_authenticated: unread_count = get_unread_count(current_user.id) return {'unread_notifications': unread_count} return {'unread_notifications': 0} def admin_required(): if not current_user.is_authenticated: return redirect(url_for('auth.login')) if not current_user.is_admin: flash('You must be an admin to access this page.', 'error') return redirect(url_for('main.dashboard')) @contacts_bp.route('/') @login_required @require_password_change def contacts_list(): result = admin_required() if result: return result # Get query parameters page = request.args.get('page', 1, type=int) per_page = 10 # Number of items per page search = request.args.get('search', '') status = request.args.get('status', '') role = request.args.get('role', '') # Start with base query query = User.query # Apply search filter if search: search_term = f"%{search}%" query = query.filter( or_( User.username.ilike(search_term), User.last_name.ilike(search_term), User.email.ilike(search_term), User.company.ilike(search_term), User.position.ilike(search_term) ) ) # Apply status filter if status == 'active': query = query.filter(User.is_active == True) elif status == 'inactive': query = query.filter(User.is_active == False) # Apply role filter if role == 'admin': query = query.filter(User.is_admin == True) elif role == 'user': query = query.filter(User.is_admin == False) # Order by creation date query = query.order_by(User.created_at.desc()) # Get pagination pagination = query.paginate(page=page, per_page=per_page, error_out=False) users = pagination.items return render_template('contacts/list.html', users=users, pagination=pagination, current_user=current_user) @contacts_bp.route('/new', methods=['GET', 'POST']) @login_required @require_password_change def new_contact(): result = admin_required() if result: return result form = UserForm() total_admins = User.query.filter_by(is_admin=True).count() if request.method == 'GET': form.is_admin.data = False # Ensure admin role is unchecked by default elif request.method == 'POST' and 'is_admin' not in request.form: form.is_admin.data = False # Explicitly set to False if not present in POST if form.validate_on_submit(): # Check if a user with this email already exists existing_user = User.query.filter_by(email=form.email.data).first() if existing_user: flash('A user with this email already exists.', 'error') return render_template('contacts/form.html', form=form, title='New User', total_admins=total_admins) # Handle profile picture upload profile_picture = None file = request.files.get('profile_picture') if file and file.filename: filename = secure_filename(file.filename) file_path = os.path.join(UPLOAD_FOLDER, filename) file.save(file_path) profile_picture = filename # Create new user account user = User( username=form.first_name.data, last_name=form.last_name.data, email=form.email.data, phone=form.phone.data, company=form.company.data, position=form.position.data, notes=form.notes.data, is_active=True, # Set default value is_admin=form.is_admin.data, profile_picture=profile_picture ) user.set_password('changeme') # Set default password db.session.add(user) db.session.commit() # Create notification for the new user create_notification( notif_type='account_created', user_id=user.id, sender_id=current_user.id, # Admin who created the account details={ 'message': 'Your DocuPulse account has been created by an administrator.', 'username': user.username, 'email': user.email, 'created_by': f"{current_user.username} {current_user.last_name}", 'timestamp': datetime.utcnow().isoformat() } ) # Log user creation event log_event( event_type='user_create', details={ 'created_by': current_user.id, 'created_by_name': f"{current_user.username} {current_user.last_name}", 'user_id': user.id, 'user_name': f"{user.username} {user.last_name}", 'email': user.email, 'is_admin': user.is_admin, 'method': 'admin_creation' } ) db.session.commit() flash('User created successfully! They will need to change their password on first login.', 'success') return redirect(url_for('contacts.contacts_list')) return render_template('contacts/form.html', form=form, title='New User', total_admins=total_admins) @contacts_bp.route('/profile/edit', methods=['GET', 'POST']) @login_required @require_password_change def edit_profile(): form = UserForm() total_admins = User.query.filter_by(is_admin=True).count() if form.validate_on_submit(): # Check if trying to remove admin status from the only admin if not form.is_admin.data and current_user.is_admin: if total_admins <= 1: flash('There must be at least one admin user in the system.', 'error') return render_template('contacts/form.html', form=form, title='Edit Profile', total_admins=total_admins) # Store old values for comparison old_values = { 'user_name': f"{current_user.username} {current_user.last_name}", 'email': current_user.email, 'phone': current_user.phone, 'company': current_user.company, 'position': current_user.position, 'is_admin': current_user.is_admin } current_user.username = form.first_name.data current_user.last_name = form.last_name.data current_user.email = form.email.data current_user.phone = form.phone.data current_user.company = form.company.data current_user.position = form.position.data current_user.notes = form.notes.data current_user.is_admin = form.is_admin.data # Set password if provided password_changed = False if form.new_password.data: current_user.set_password(form.new_password.data) password_changed = True db.session.commit() # Log profile update event log_event( event_type='user_update', details={ 'user_id': current_user.id, 'user_name': f"{current_user.username} {current_user.last_name}", 'updated_by': current_user.id, 'updated_by_name': f"{current_user.username} {current_user.last_name}", 'old_values': old_values, 'new_values': { 'username': current_user.username, 'last_name': current_user.last_name, 'email': current_user.email, 'phone': current_user.phone, 'company': current_user.company, 'position': current_user.position, 'is_admin': current_user.is_admin }, 'password_changed': password_changed, 'method': 'self_update' } ) db.session.commit() flash('Profile updated successfully!', 'success') return redirect(url_for('contacts.contacts_list')) # Pre-fill the form with current user data if request.method == 'GET': form.first_name.data = current_user.username form.last_name.data = current_user.last_name form.email.data = current_user.email form.phone.data = current_user.phone form.company.data = current_user.company form.position.data = current_user.position form.notes.data = current_user.notes form.is_admin.data = current_user.is_admin return render_template('contacts/form.html', form=form, title='Edit Profile', total_admins=total_admins) @contacts_bp.route('//edit', methods=['GET', 'POST']) @login_required @require_password_change def edit_contact(id): result = admin_required() if result: return result total_admins = User.query.filter_by(is_admin=True).count() user = User.query.get_or_404(id) form = UserForm() if request.method == 'GET': form.first_name.data = user.username form.last_name.data = user.last_name form.email.data = user.email form.phone.data = user.phone form.company.data = user.company form.position.data = user.position form.notes.data = user.notes form.is_admin.data = user.is_admin if form.validate_on_submit(): # Handle profile picture removal if 'remove_picture' in request.form: if user.profile_picture: # Delete the old profile picture file old_picture_path = os.path.join(UPLOAD_FOLDER, user.profile_picture) if os.path.exists(old_picture_path): os.remove(old_picture_path) user.profile_picture = None db.session.commit() flash('Profile picture removed successfully!', 'success') return redirect(url_for('contacts.edit_contact', id=user.id)) # Handle profile picture upload file = request.files.get('profile_picture') if file and file.filename: # Delete old profile picture if it exists if user.profile_picture: old_picture_path = os.path.join(UPLOAD_FOLDER, user.profile_picture) if os.path.exists(old_picture_path): os.remove(old_picture_path) filename = secure_filename(file.filename) file_path = os.path.join(UPLOAD_FOLDER, filename) file.save(file_path) user.profile_picture = filename # Prevent removing admin from the last admin if not form.is_admin.data and user.is_admin and total_admins <= 1: flash('There must be at least one admin user in the system.', 'error') return render_template('contacts/form.html', form=form, title='Edit User', total_admins=total_admins, user=user) # Check if the new email is already used by another user if form.email.data != user.email: existing_user = User.query.filter_by(email=form.email.data).first() if existing_user: flash('A user with this email already exists.', 'error') return render_template('contacts/form.html', form=form, title='Edit User', total_admins=total_admins, user=user) # Store old values for comparison old_values = { 'user_name': f"{user.username} {user.last_name}", 'email': user.email, 'phone': user.phone, 'company': user.company, 'position': user.position, 'is_admin': user.is_admin } user.username = form.first_name.data user.last_name = form.last_name.data user.email = form.email.data user.phone = form.phone.data user.company = form.company.data user.position = form.position.data user.notes = form.notes.data user.is_admin = form.is_admin.data # Set password if provided password_changed = False if form.new_password.data: user.set_password(form.new_password.data) password_changed = True db.session.commit() # Create notification for the user being updated create_notification( notif_type='account_updated', user_id=user.id, sender_id=current_user.id, details={ 'message': 'Your account has been updated by an administrator.', 'updated_by': f"{current_user.username} {current_user.last_name}", 'changes': { 'name': f"{user.username} {user.last_name}", 'email': user.email, 'phone': user.phone, 'company': user.company, 'position': user.position, 'password_changed': password_changed }, 'timestamp': datetime.utcnow().isoformat() } ) # Log user update event log_event( event_type='user_update', details={ 'user_id': user.id, 'user_name': f"{user.username} {user.last_name}", 'updated_by': current_user.id, 'updated_by_name': f"{current_user.username} {current_user.last_name}", 'old_values': old_values, 'new_values': { 'user_name': f"{user.username} {user.last_name}", 'email': user.email, 'phone': user.phone, 'company': user.company, 'position': user.position, 'is_admin': user.is_admin }, 'password_changed': password_changed, 'method': 'admin_update' } ) db.session.commit() flash('User updated successfully!', 'success') return redirect(url_for('contacts.contacts_list')) return render_template('contacts/form.html', form=form, title='Edit User', total_admins=total_admins, user=user) @contacts_bp.route('//delete', methods=['POST']) @login_required @require_password_change def delete_contact(id): result = admin_required() if result: return result user = User.query.get_or_404(id) if user.email == current_user.email: flash('You cannot delete your own account.', 'error') return redirect(url_for('contacts.contacts_list')) # Create notification for the user being deleted create_notification( notif_type='account_deleted', user_id=user.id, sender_id=current_user.id, details={ 'message': 'Your account has been deleted by an administrator.', 'deleted_by': f"{current_user.username} {current_user.last_name}", 'timestamp': datetime.utcnow().isoformat() } ) # Log user deletion event log_event( event_type='user_delete', details={ 'user_id': user.id, 'user_name': f"{user.username} {user.last_name}", 'deleted_by': current_user.id, 'deleted_by_name': f"{current_user.username} {current_user.last_name}", 'email': user.email, 'is_admin': user.is_admin } ) db.session.commit() db.session.delete(user) db.session.commit() flash('User deleted successfully!', 'success') return redirect(url_for('contacts.contacts_list')) @contacts_bp.route('//toggle-active', methods=['POST']) @login_required @require_password_change def toggle_active(id): result = admin_required() if result: return result user = User.query.get_or_404(id) if user.email == current_user.email: flash('You cannot deactivate your own account.', 'error') return redirect(url_for('contacts.contacts_list')) old_status = user.is_active user.is_active = not user.is_active # Log status change event log_event( event_type='user_update', details={ 'user_id': user.id, 'user_name': f"{user.username} {user.last_name}", 'updated_by': current_user.id, 'updated_by_name': f"{current_user.username} {current_user.last_name}", 'update_type': 'status_change', 'old_status': old_status, 'new_status': user.is_active, 'email': user.email } ) db.session.commit() flash(f'User marked as {"active" if user.is_active else "inactive"}!', 'success') return redirect(url_for('contacts.contacts_list'))