500 lines
19 KiB
Python
500 lines
19 KiB
Python
from flask import Blueprint, render_template, redirect, url_for, flash, request, jsonify
|
|
from flask_login import login_required, current_user
|
|
from models import db, User, Notif, PasswordSetupToken
|
|
from forms import UserForm
|
|
from flask import abort
|
|
from sqlalchemy import or_
|
|
from routes.auth import require_password_change
|
|
from utils import log_event, create_notification, get_unread_count
|
|
import json
|
|
import os
|
|
from werkzeug.utils import secure_filename
|
|
from datetime import datetime, timedelta
|
|
import secrets
|
|
import string
|
|
|
|
contacts_bp = Blueprint('contacts', __name__, url_prefix='/contacts')
|
|
|
|
UPLOAD_FOLDER = os.path.join(os.getcwd(), 'uploads', 'profile_pics')
|
|
if not os.path.exists(UPLOAD_FOLDER):
|
|
os.makedirs(UPLOAD_FOLDER)
|
|
|
|
@contacts_bp.context_processor
|
|
def inject_unread_notifications():
|
|
if current_user.is_authenticated:
|
|
unread_count = get_unread_count(current_user.id)
|
|
return {'unread_notifications': unread_count}
|
|
return {'unread_notifications': 0}
|
|
|
|
def admin_required():
|
|
if not current_user.is_authenticated:
|
|
return redirect(url_for('auth.login'))
|
|
if not (current_user.is_admin or current_user.is_manager):
|
|
flash('You must be an admin or manager to access this page.', 'error')
|
|
return redirect(url_for('main.dashboard'))
|
|
|
|
@contacts_bp.route('/')
|
|
@login_required
|
|
@require_password_change
|
|
def contacts_list():
|
|
result = admin_required()
|
|
if result: return result
|
|
|
|
# Get query parameters
|
|
page = request.args.get('page', 1, type=int)
|
|
per_page = 10 # Number of items per page
|
|
search = request.args.get('search', '')
|
|
status = request.args.get('status', '')
|
|
role = request.args.get('role', '')
|
|
|
|
# Start with base query
|
|
query = User.query
|
|
|
|
# Apply search filter
|
|
if search:
|
|
search_term = f"%{search}%"
|
|
query = query.filter(
|
|
or_(
|
|
User.username.ilike(search_term),
|
|
User.last_name.ilike(search_term),
|
|
User.email.ilike(search_term),
|
|
User.company.ilike(search_term),
|
|
User.position.ilike(search_term)
|
|
)
|
|
)
|
|
|
|
# Apply status filter
|
|
if status == 'active':
|
|
query = query.filter(User.is_active == True)
|
|
elif status == 'inactive':
|
|
query = query.filter(User.is_active == False)
|
|
|
|
# Apply role filter
|
|
if role == 'admin':
|
|
query = query.filter(User.is_admin == True)
|
|
elif role == 'manager':
|
|
query = query.filter(User.is_manager == True)
|
|
elif role == 'user':
|
|
query = query.filter(User.is_admin == False, User.is_manager == False)
|
|
|
|
# Order by creation date
|
|
query = query.order_by(User.created_at.desc())
|
|
|
|
# Get pagination
|
|
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
|
|
users = pagination.items
|
|
|
|
return render_template('contacts/list.html',
|
|
users=users,
|
|
pagination=pagination,
|
|
current_user=current_user)
|
|
|
|
@contacts_bp.route('/new', methods=['GET', 'POST'])
|
|
@login_required
|
|
@require_password_change
|
|
def new_contact():
|
|
result = admin_required()
|
|
if result: return result
|
|
form = UserForm()
|
|
total_admins = User.query.filter_by(is_admin=True).count()
|
|
if request.method == 'GET':
|
|
form.role.data = 'user' # Default to standard user
|
|
elif request.method == 'POST':
|
|
if form.validate_on_submit():
|
|
# Check if a user with this email already exists
|
|
existing_user = User.query.filter_by(email=form.email.data).first()
|
|
if existing_user:
|
|
flash('A user with this email already exists.', 'error')
|
|
return render_template('contacts/form.html', form=form, title='New User', total_admins=total_admins)
|
|
|
|
# Handle profile picture upload
|
|
profile_picture = None
|
|
file = request.files.get('profile_picture')
|
|
if file and file.filename:
|
|
filename = secure_filename(file.filename)
|
|
file_path = os.path.join(UPLOAD_FOLDER, filename)
|
|
file.save(file_path)
|
|
profile_picture = filename
|
|
|
|
# Generate a random password
|
|
alphabet = string.ascii_letters + string.digits + string.punctuation
|
|
random_password = ''.join(secrets.choice(alphabet) for _ in range(32))
|
|
|
|
# Create new user
|
|
user = User(
|
|
username=form.first_name.data,
|
|
last_name=form.last_name.data,
|
|
email=form.email.data,
|
|
phone=form.phone.data,
|
|
company=form.company.data,
|
|
position=form.position.data,
|
|
notes=form.notes.data,
|
|
is_admin=(form.role.data == 'admin'),
|
|
is_manager=(form.role.data == 'manager'),
|
|
profile_picture=profile_picture
|
|
)
|
|
user.set_password(random_password)
|
|
db.session.add(user)
|
|
|
|
# Log user creation event
|
|
log_event(
|
|
event_type='user_create',
|
|
details={
|
|
'created_by': current_user.id,
|
|
'created_by_name': f"{current_user.username} {current_user.last_name}",
|
|
'user_id': user.id,
|
|
'user_name': f"{user.username} {user.last_name}",
|
|
'email': user.email,
|
|
'role': form.role.data,
|
|
'method': 'admin_creation'
|
|
}
|
|
)
|
|
db.session.commit()
|
|
|
|
flash('User created successfully! They will receive an email with a link to set up their password.', 'success')
|
|
return redirect(url_for('contacts.contacts_list'))
|
|
return render_template('contacts/form.html', form=form, title='New User', total_admins=total_admins)
|
|
|
|
@contacts_bp.route('/profile/edit', methods=['GET', 'POST'])
|
|
@login_required
|
|
@require_password_change
|
|
def edit_profile():
|
|
form = UserForm()
|
|
total_admins = User.query.filter_by(is_admin=True).count()
|
|
|
|
if form.validate_on_submit():
|
|
# Check if trying to remove admin status from the only admin
|
|
if not form.is_admin.data and current_user.is_admin:
|
|
if total_admins <= 1:
|
|
flash('There must be at least one admin user in the system.', 'error')
|
|
return render_template('contacts/form.html', form=form, title='Edit Profile', total_admins=total_admins)
|
|
|
|
# Store old values for comparison
|
|
old_values = {
|
|
'user_name': f"{current_user.username} {current_user.last_name}",
|
|
'email': current_user.email,
|
|
'phone': current_user.phone,
|
|
'company': current_user.company,
|
|
'position': current_user.position,
|
|
'is_admin': current_user.is_admin
|
|
}
|
|
|
|
current_user.username = form.first_name.data
|
|
current_user.last_name = form.last_name.data
|
|
current_user.email = form.email.data
|
|
current_user.phone = form.phone.data
|
|
current_user.company = form.company.data
|
|
current_user.position = form.position.data
|
|
current_user.notes = form.notes.data
|
|
current_user.is_admin = form.is_admin.data
|
|
|
|
# Set password if provided
|
|
password_changed = False
|
|
if form.new_password.data:
|
|
current_user.set_password(form.new_password.data)
|
|
password_changed = True
|
|
|
|
db.session.commit()
|
|
|
|
# Log profile update event
|
|
log_event(
|
|
event_type='user_update',
|
|
details={
|
|
'user_id': current_user.id,
|
|
'user_name': f"{current_user.username} {current_user.last_name}",
|
|
'updated_by': current_user.id,
|
|
'updated_by_name': f"{current_user.username} {current_user.last_name}",
|
|
'old_values': old_values,
|
|
'new_values': {
|
|
'username': current_user.username,
|
|
'last_name': current_user.last_name,
|
|
'email': current_user.email,
|
|
'phone': current_user.phone,
|
|
'company': current_user.company,
|
|
'position': current_user.position,
|
|
'is_admin': current_user.is_admin
|
|
},
|
|
'password_changed': password_changed,
|
|
'method': 'self_update'
|
|
}
|
|
)
|
|
db.session.commit()
|
|
|
|
flash('Profile updated successfully!', 'success')
|
|
return redirect(url_for('contacts.contacts_list'))
|
|
|
|
# Pre-fill the form with current user data
|
|
if request.method == 'GET':
|
|
form.first_name.data = current_user.username
|
|
form.last_name.data = current_user.last_name
|
|
form.email.data = current_user.email
|
|
form.phone.data = current_user.phone
|
|
form.company.data = current_user.company
|
|
form.position.data = current_user.position
|
|
form.notes.data = current_user.notes
|
|
form.is_admin.data = current_user.is_admin
|
|
|
|
return render_template('contacts/form.html', form=form, title='Edit Profile', total_admins=total_admins)
|
|
|
|
@contacts_bp.route('/<int:id>/edit', methods=['GET', 'POST'])
|
|
@login_required
|
|
@require_password_change
|
|
def edit_contact(id):
|
|
result = admin_required()
|
|
if result: return result
|
|
total_admins = User.query.filter_by(is_admin=True).count()
|
|
user = User.query.get_or_404(id)
|
|
form = UserForm()
|
|
if request.method == 'GET':
|
|
form.first_name.data = user.username
|
|
form.last_name.data = user.last_name
|
|
form.email.data = user.email
|
|
form.phone.data = user.phone
|
|
form.company.data = user.company
|
|
form.position.data = user.position
|
|
form.notes.data = user.notes
|
|
# Set role based on current permissions
|
|
if user.is_admin:
|
|
form.role.data = 'admin'
|
|
elif user.is_manager:
|
|
form.role.data = 'manager'
|
|
else:
|
|
form.role.data = 'user'
|
|
if form.validate_on_submit():
|
|
# Handle profile picture removal
|
|
if 'remove_picture' in request.form:
|
|
if user.profile_picture:
|
|
# Delete the old profile picture file
|
|
old_picture_path = os.path.join(UPLOAD_FOLDER, user.profile_picture)
|
|
if os.path.exists(old_picture_path):
|
|
os.remove(old_picture_path)
|
|
user.profile_picture = None
|
|
db.session.commit()
|
|
flash('Profile picture removed successfully!', 'success')
|
|
return redirect(url_for('contacts.edit_contact', id=user.id))
|
|
|
|
# Handle profile picture upload
|
|
file = request.files.get('profile_picture')
|
|
if file and file.filename:
|
|
# Delete old profile picture if it exists
|
|
if user.profile_picture:
|
|
old_picture_path = os.path.join(UPLOAD_FOLDER, user.profile_picture)
|
|
if os.path.exists(old_picture_path):
|
|
os.remove(old_picture_path)
|
|
filename = secure_filename(file.filename)
|
|
file_path = os.path.join(UPLOAD_FOLDER, filename)
|
|
file.save(file_path)
|
|
user.profile_picture = filename
|
|
|
|
# Prevent removing admin from the last admin
|
|
if form.role.data != 'admin' and user.is_admin and total_admins <= 1:
|
|
flash('There must be at least one admin user in the system.', 'error')
|
|
return render_template('contacts/form.html', form=form, title='Edit User', total_admins=total_admins, user=user)
|
|
|
|
# Check if the new email is already used by another user
|
|
if form.email.data != user.email:
|
|
existing_user = User.query.filter_by(email=form.email.data).first()
|
|
if existing_user:
|
|
flash('A user with this email already exists.', 'error')
|
|
return render_template('contacts/form.html', form=form, title='Edit User', total_admins=total_admins, user=user)
|
|
|
|
# Store old values for comparison
|
|
old_values = {
|
|
'user_name': f"{user.username} {user.last_name}",
|
|
'email': user.email,
|
|
'phone': user.phone,
|
|
'company': user.company,
|
|
'position': user.position,
|
|
'role': 'admin' if user.is_admin else 'manager' if user.is_manager else 'user'
|
|
}
|
|
|
|
user.username = form.first_name.data
|
|
user.last_name = form.last_name.data
|
|
user.email = form.email.data
|
|
user.phone = form.phone.data
|
|
user.company = form.company.data
|
|
user.position = form.position.data
|
|
user.notes = form.notes.data
|
|
user.is_admin = (form.role.data == 'admin')
|
|
user.is_manager = (form.role.data == 'manager')
|
|
|
|
# Set password if provided
|
|
password_changed = False
|
|
if form.new_password.data:
|
|
user.set_password(form.new_password.data)
|
|
password_changed = True
|
|
|
|
db.session.commit()
|
|
|
|
# Create notification for the user being updated
|
|
create_notification(
|
|
notif_type='account_updated',
|
|
user_id=user.id,
|
|
sender_id=current_user.id,
|
|
details={
|
|
'message': 'Your account has been updated by an administrator.',
|
|
'updated_by': f"{current_user.username} {current_user.last_name}",
|
|
'changes': {
|
|
'name': f"{user.username} {user.last_name}",
|
|
'email': user.email,
|
|
'phone': user.phone,
|
|
'company': user.company,
|
|
'position': user.position,
|
|
'role': form.role.data,
|
|
'password_changed': password_changed
|
|
},
|
|
'timestamp': datetime.utcnow().isoformat()
|
|
}
|
|
)
|
|
|
|
# Log user update event
|
|
log_event(
|
|
event_type='user_update',
|
|
details={
|
|
'user_id': user.id,
|
|
'user_name': f"{user.username} {user.last_name}",
|
|
'updated_by': current_user.id,
|
|
'updated_by_name': f"{current_user.username} {current_user.last_name}",
|
|
'old_values': old_values,
|
|
'new_values': {
|
|
'user_name': f"{user.username} {user.last_name}",
|
|
'email': user.email,
|
|
'phone': user.phone,
|
|
'company': user.company,
|
|
'position': user.position,
|
|
'role': form.role.data
|
|
},
|
|
'password_changed': password_changed,
|
|
'method': 'admin_update'
|
|
}
|
|
)
|
|
db.session.commit()
|
|
|
|
flash('User updated successfully!', 'success')
|
|
return redirect(url_for('contacts.contacts_list'))
|
|
return render_template('contacts/form.html', form=form, title='Edit User', total_admins=total_admins, user=user)
|
|
|
|
@contacts_bp.route('/<int:id>/delete', methods=['POST'])
|
|
@login_required
|
|
@require_password_change
|
|
def delete_contact(id):
|
|
result = admin_required()
|
|
if result: return result
|
|
user = User.query.get_or_404(id)
|
|
if user.email == current_user.email:
|
|
flash('You cannot delete your own account.', 'error')
|
|
return redirect(url_for('contacts.contacts_list'))
|
|
|
|
# Create notification for the user being deleted
|
|
create_notification(
|
|
notif_type='account_deleted',
|
|
user_id=user.id,
|
|
sender_id=current_user.id,
|
|
details={
|
|
'message': 'Your account has been deleted by an administrator.',
|
|
'deleted_by': f"{current_user.username} {current_user.last_name}",
|
|
'timestamp': datetime.utcnow().isoformat()
|
|
}
|
|
)
|
|
|
|
# Log user deletion event
|
|
log_event(
|
|
event_type='user_delete',
|
|
details={
|
|
'user_id': user.id,
|
|
'user_name': f"{user.username} {user.last_name}",
|
|
'deleted_by': current_user.id,
|
|
'deleted_by_name': f"{current_user.username} {current_user.last_name}",
|
|
'email': user.email,
|
|
'is_admin': user.is_admin
|
|
}
|
|
)
|
|
db.session.commit()
|
|
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
flash('User deleted successfully!', 'success')
|
|
return redirect(url_for('contacts.contacts_list'))
|
|
|
|
@contacts_bp.route('/<int:id>/toggle-active', methods=['POST'])
|
|
@login_required
|
|
@require_password_change
|
|
def toggle_active(id):
|
|
result = admin_required()
|
|
if result: return result
|
|
user = User.query.get_or_404(id)
|
|
if user.email == current_user.email:
|
|
flash('You cannot deactivate your own account.', 'error')
|
|
return redirect(url_for('contacts.contacts_list'))
|
|
|
|
old_status = user.is_active
|
|
user.is_active = not user.is_active
|
|
|
|
# Log status change event
|
|
log_event(
|
|
event_type='user_update',
|
|
details={
|
|
'user_id': user.id,
|
|
'user_name': f"{user.username} {user.last_name}",
|
|
'updated_by': current_user.id,
|
|
'updated_by_name': f"{current_user.username} {current_user.last_name}",
|
|
'update_type': 'status_change',
|
|
'old_status': old_status,
|
|
'new_status': user.is_active,
|
|
'email': user.email
|
|
}
|
|
)
|
|
db.session.commit()
|
|
|
|
flash(f'User marked as {"active" if user.is_active else "inactive"}!', 'success')
|
|
return redirect(url_for('contacts.contacts_list'))
|
|
|
|
@contacts_bp.route('/<int:id>/resend-setup', methods=['POST'])
|
|
@login_required
|
|
@require_password_change
|
|
def resend_setup_link(id):
|
|
result = admin_required()
|
|
if result: return result
|
|
|
|
user = User.query.get_or_404(id)
|
|
|
|
# Create new password setup token
|
|
token = secrets.token_urlsafe(32)
|
|
setup_token = PasswordSetupToken(
|
|
user_id=user.id,
|
|
token=token,
|
|
expires_at=datetime.utcnow() + timedelta(hours=24)
|
|
)
|
|
db.session.add(setup_token)
|
|
|
|
# Create notification for the user
|
|
create_notification(
|
|
notif_type='account_created',
|
|
user_id=user.id,
|
|
sender_id=current_user.id,
|
|
details={
|
|
'message': 'A new password setup link has been sent to you.',
|
|
'username': user.username,
|
|
'email': user.email,
|
|
'created_by': f"{current_user.username} {current_user.last_name}",
|
|
'timestamp': datetime.utcnow().isoformat(),
|
|
'setup_link': url_for('auth.setup_password', token=token, _external=True)
|
|
}
|
|
)
|
|
|
|
# Log the event
|
|
log_event(
|
|
event_type='user_update',
|
|
details={
|
|
'user_id': user.id,
|
|
'user_name': f"{user.username} {user.last_name}",
|
|
'updated_by': current_user.id,
|
|
'updated_by_name': f"{current_user.username} {current_user.last_name}",
|
|
'update_type': 'password_setup_link_resend'
|
|
}
|
|
)
|
|
|
|
db.session.commit()
|
|
|
|
flash('Password setup link has been resent to the user.', 'success')
|
|
return redirect(url_for('contacts.contacts_list')) |