Files
docupulse/routes/contacts.py

395 lines
15 KiB
Python

from flask import Blueprint, render_template, redirect, url_for, flash, request
from flask_login import login_required, current_user
from models import db, User
from forms import UserForm
from flask import abort
from sqlalchemy import or_
from routes.auth import require_password_change
from utils import log_event
import json
import os
from werkzeug.utils import secure_filename
contacts_bp = Blueprint('contacts', __name__, url_prefix='/contacts')
UPLOAD_FOLDER = os.path.join(os.getcwd(), 'uploads', 'profile_pics')
if not os.path.exists(UPLOAD_FOLDER):
os.makedirs(UPLOAD_FOLDER)
def admin_required():
if not current_user.is_authenticated:
return redirect(url_for('auth.login'))
if not current_user.is_admin:
flash('You must be an admin to access this page.', 'error')
return redirect(url_for('main.dashboard'))
@contacts_bp.route('/')
@login_required
@require_password_change
def contacts_list():
result = admin_required()
if result: return result
# Get query parameters
page = request.args.get('page', 1, type=int)
per_page = 10 # Number of items per page
search = request.args.get('search', '')
status = request.args.get('status', '')
role = request.args.get('role', '')
# Start with base query
query = User.query
# Apply search filter
if search:
search_term = f"%{search}%"
query = query.filter(
or_(
User.username.ilike(search_term),
User.last_name.ilike(search_term),
User.email.ilike(search_term),
User.company.ilike(search_term),
User.position.ilike(search_term)
)
)
# Apply status filter
if status == 'active':
query = query.filter(User.is_active == True)
elif status == 'inactive':
query = query.filter(User.is_active == False)
# Apply role filter
if role == 'admin':
query = query.filter(User.is_admin == True)
elif role == 'user':
query = query.filter(User.is_admin == False)
# Order by creation date
query = query.order_by(User.created_at.desc())
# Get pagination
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
users = pagination.items
return render_template('contacts/list.html',
users=users,
pagination=pagination,
current_user=current_user)
@contacts_bp.route('/new', methods=['GET', 'POST'])
@login_required
@require_password_change
def new_contact():
result = admin_required()
if result: return result
form = UserForm()
total_admins = User.query.filter_by(is_admin=True).count()
if request.method == 'GET':
form.is_admin.data = False # Ensure admin role is unchecked by default
elif request.method == 'POST' and 'is_admin' not in request.form:
form.is_admin.data = False # Explicitly set to False if not present in POST
if form.validate_on_submit():
# Check if a user with this email already exists
existing_user = User.query.filter_by(email=form.email.data).first()
if existing_user:
flash('A user with this email already exists.', 'error')
return render_template('contacts/form.html', form=form, title='New User', total_admins=total_admins)
# Handle profile picture upload
profile_picture = None
file = request.files.get('profile_picture')
if file and file.filename:
filename = secure_filename(file.filename)
file_path = os.path.join(UPLOAD_FOLDER, filename)
file.save(file_path)
profile_picture = filename
# Create new user account
user = User(
username=form.first_name.data,
last_name=form.last_name.data,
email=form.email.data,
phone=form.phone.data,
company=form.company.data,
position=form.position.data,
notes=form.notes.data,
is_active=True, # Set default value
is_admin=form.is_admin.data,
profile_picture=profile_picture
)
user.set_password('changeme') # Set default password
db.session.add(user)
db.session.commit()
# Log user creation event
log_event(
event_type='user_create',
details={
'created_by': current_user.id,
'created_by_name': f"{current_user.username} {current_user.last_name}",
'user_id': user.id,
'user_name': f"{user.username} {user.last_name}",
'email': user.email,
'is_admin': user.is_admin,
'method': 'admin_creation'
}
)
db.session.commit()
flash('User created successfully! They will need to change their password on first login.', 'success')
return redirect(url_for('contacts.contacts_list'))
return render_template('contacts/form.html', form=form, title='New User', total_admins=total_admins)
@contacts_bp.route('/profile/edit', methods=['GET', 'POST'])
@login_required
@require_password_change
def edit_profile():
form = UserForm()
total_admins = User.query.filter_by(is_admin=True).count()
if form.validate_on_submit():
# Check if trying to remove admin status from the only admin
if not form.is_admin.data and current_user.is_admin:
if total_admins <= 1:
flash('There must be at least one admin user in the system.', 'error')
return render_template('contacts/form.html', form=form, title='Edit Profile', total_admins=total_admins)
# Store old values for comparison
old_values = {
'user_name': f"{current_user.username} {current_user.last_name}",
'email': current_user.email,
'phone': current_user.phone,
'company': current_user.company,
'position': current_user.position,
'is_admin': current_user.is_admin
}
current_user.username = form.first_name.data
current_user.last_name = form.last_name.data
current_user.email = form.email.data
current_user.phone = form.phone.data
current_user.company = form.company.data
current_user.position = form.position.data
current_user.notes = form.notes.data
current_user.is_admin = form.is_admin.data
# Set password if provided
password_changed = False
if form.new_password.data:
current_user.set_password(form.new_password.data)
password_changed = True
db.session.commit()
# Log profile update event
log_event(
event_type='user_update',
details={
'user_id': current_user.id,
'user_name': f"{current_user.username} {current_user.last_name}",
'updated_by': current_user.id,
'updated_by_name': f"{current_user.username} {current_user.last_name}",
'old_values': old_values,
'new_values': {
'username': current_user.username,
'last_name': current_user.last_name,
'email': current_user.email,
'phone': current_user.phone,
'company': current_user.company,
'position': current_user.position,
'is_admin': current_user.is_admin
},
'password_changed': password_changed,
'method': 'self_update'
}
)
db.session.commit()
flash('Profile updated successfully!', 'success')
return redirect(url_for('contacts.contacts_list'))
# Pre-fill the form with current user data
if request.method == 'GET':
form.first_name.data = current_user.username
form.last_name.data = current_user.last_name
form.email.data = current_user.email
form.phone.data = current_user.phone
form.company.data = current_user.company
form.position.data = current_user.position
form.notes.data = current_user.notes
form.is_admin.data = current_user.is_admin
return render_template('contacts/form.html', form=form, title='Edit Profile', total_admins=total_admins)
@contacts_bp.route('/<int:id>/edit', methods=['GET', 'POST'])
@login_required
@require_password_change
def edit_contact(id):
result = admin_required()
if result: return result
total_admins = User.query.filter_by(is_admin=True).count()
user = User.query.get_or_404(id)
form = UserForm()
if request.method == 'GET':
form.first_name.data = user.username
form.last_name.data = user.last_name
form.email.data = user.email
form.phone.data = user.phone
form.company.data = user.company
form.position.data = user.position
form.notes.data = user.notes
form.is_admin.data = user.is_admin
if form.validate_on_submit():
# Handle profile picture removal
if 'remove_picture' in request.form:
if user.profile_picture:
# Delete the old profile picture file
old_picture_path = os.path.join(UPLOAD_FOLDER, user.profile_picture)
if os.path.exists(old_picture_path):
os.remove(old_picture_path)
user.profile_picture = None
db.session.commit()
flash('Profile picture removed successfully!', 'success')
return redirect(url_for('contacts.edit_contact', id=user.id))
# Handle profile picture upload
file = request.files.get('profile_picture')
if file and file.filename:
# Delete old profile picture if it exists
if user.profile_picture:
old_picture_path = os.path.join(UPLOAD_FOLDER, user.profile_picture)
if os.path.exists(old_picture_path):
os.remove(old_picture_path)
filename = secure_filename(file.filename)
file_path = os.path.join(UPLOAD_FOLDER, filename)
file.save(file_path)
user.profile_picture = filename
# Prevent removing admin from the last admin
if not form.is_admin.data and user.is_admin and total_admins <= 1:
flash('There must be at least one admin user in the system.', 'error')
return render_template('contacts/form.html', form=form, title='Edit User', total_admins=total_admins, user=user)
# Check if the new email is already used by another user
if form.email.data != user.email:
existing_user = User.query.filter_by(email=form.email.data).first()
if existing_user:
flash('A user with this email already exists.', 'error')
return render_template('contacts/form.html', form=form, title='Edit User', total_admins=total_admins, user=user)
# Store old values for comparison
old_values = {
'user_name': f"{user.username} {user.last_name}",
'email': user.email,
'phone': user.phone,
'company': user.company,
'position': user.position,
'is_admin': user.is_admin
}
user.username = form.first_name.data
user.last_name = form.last_name.data
user.email = form.email.data
user.phone = form.phone.data
user.company = form.company.data
user.position = form.position.data
user.notes = form.notes.data
user.is_admin = form.is_admin.data
# Set password if provided
password_changed = False
if form.new_password.data:
user.set_password(form.new_password.data)
password_changed = True
db.session.commit()
# Log user update event
log_event(
event_type='user_update',
details={
'user_id': user.id,
'user_name': f"{user.username} {user.last_name}",
'updated_by': current_user.id,
'updated_by_name': f"{current_user.username} {current_user.last_name}",
'old_values': old_values,
'new_values': {
'user_name': f"{user.username} {user.last_name}",
'email': user.email,
'phone': user.phone,
'company': user.company,
'position': user.position,
'is_admin': user.is_admin
},
'password_changed': password_changed,
'method': 'admin_update'
}
)
db.session.commit()
flash('User updated successfully!', 'success')
return redirect(url_for('contacts.contacts_list'))
return render_template('contacts/form.html', form=form, title='Edit User', total_admins=total_admins, user=user)
@contacts_bp.route('/<int:id>/delete', methods=['POST'])
@login_required
@require_password_change
def delete_contact(id):
result = admin_required()
if result: return result
user = User.query.get_or_404(id)
if user.email == current_user.email:
flash('You cannot delete your own account.', 'error')
return redirect(url_for('contacts.contacts_list'))
# Log user deletion event
log_event(
event_type='user_delete',
details={
'user_id': user.id,
'user_name': f"{user.username} {user.last_name}",
'deleted_by': current_user.id,
'deleted_by_name': f"{current_user.username} {current_user.last_name}",
'email': user.email,
'is_admin': user.is_admin
}
)
db.session.commit()
db.session.delete(user)
db.session.commit()
flash('User deleted successfully!', 'success')
return redirect(url_for('contacts.contacts_list'))
@contacts_bp.route('/<int:id>/toggle-active', methods=['POST'])
@login_required
@require_password_change
def toggle_active(id):
result = admin_required()
if result: return result
user = User.query.get_or_404(id)
if user.email == current_user.email:
flash('You cannot deactivate your own account.', 'error')
return redirect(url_for('contacts.contacts_list'))
old_status = user.is_active
user.is_active = not user.is_active
# Log status change event
log_event(
event_type='user_update',
details={
'user_id': user.id,
'user_name': f"{user.username} {user.last_name}",
'updated_by': current_user.id,
'updated_by_name': f"{current_user.username} {current_user.last_name}",
'update_type': 'status_change',
'old_status': old_status,
'new_status': user.is_active,
'email': user.email
}
)
db.session.commit()
flash(f'User marked as {"active" if user.is_active else "inactive"}!', 'success')
return redirect(url_for('contacts.contacts_list'))