first
This commit is contained in:
264
routes/contacts.py
Normal file
264
routes/contacts.py
Normal file
@@ -0,0 +1,264 @@
|
||||
from flask import Blueprint, render_template, redirect, url_for, flash, request
|
||||
from flask_login import login_required, current_user
|
||||
from models import db, User
|
||||
from forms import UserForm
|
||||
from flask import abort
|
||||
from sqlalchemy import or_
|
||||
import json
|
||||
import os
|
||||
from werkzeug.utils import secure_filename
|
||||
|
||||
contacts_bp = Blueprint('contacts', __name__, url_prefix='/contacts')
|
||||
|
||||
UPLOAD_FOLDER = os.path.join(os.getcwd(), 'uploads', 'profile_pics')
|
||||
if not os.path.exists(UPLOAD_FOLDER):
|
||||
os.makedirs(UPLOAD_FOLDER)
|
||||
|
||||
def admin_required():
|
||||
if not current_user.is_authenticated:
|
||||
return redirect(url_for('auth.login'))
|
||||
if not current_user.is_admin:
|
||||
flash('You must be an admin to access this page.', 'error')
|
||||
return redirect(url_for('main.dashboard'))
|
||||
|
||||
@contacts_bp.route('/')
|
||||
@login_required
|
||||
def contacts_list():
|
||||
result = admin_required()
|
||||
if result: return result
|
||||
|
||||
# Get query parameters
|
||||
page = request.args.get('page', 1, type=int)
|
||||
per_page = 10 # Number of items per page
|
||||
search = request.args.get('search', '')
|
||||
status = request.args.get('status', '')
|
||||
role = request.args.get('role', '')
|
||||
|
||||
# Start with base query
|
||||
query = User.query
|
||||
|
||||
# Apply search filter
|
||||
if search:
|
||||
search_term = f"%{search}%"
|
||||
query = query.filter(
|
||||
or_(
|
||||
User.username.ilike(search_term),
|
||||
User.last_name.ilike(search_term),
|
||||
User.email.ilike(search_term),
|
||||
User.company.ilike(search_term),
|
||||
User.position.ilike(search_term)
|
||||
)
|
||||
)
|
||||
|
||||
# Apply status filter
|
||||
if status == 'active':
|
||||
query = query.filter(User.is_active == True)
|
||||
elif status == 'inactive':
|
||||
query = query.filter(User.is_active == False)
|
||||
|
||||
# Apply role filter
|
||||
if role == 'admin':
|
||||
query = query.filter(User.is_admin == True)
|
||||
elif role == 'user':
|
||||
query = query.filter(User.is_admin == False)
|
||||
|
||||
# Order by creation date
|
||||
query = query.order_by(User.created_at.desc())
|
||||
|
||||
# Get pagination
|
||||
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
|
||||
users = pagination.items
|
||||
|
||||
return render_template('contacts/list.html',
|
||||
users=users,
|
||||
pagination=pagination,
|
||||
current_user=current_user)
|
||||
|
||||
@contacts_bp.route('/new', methods=['GET', 'POST'])
|
||||
@login_required
|
||||
def new_contact():
|
||||
result = admin_required()
|
||||
if result: return result
|
||||
form = UserForm()
|
||||
total_admins = User.query.filter_by(is_admin=True).count()
|
||||
if request.method == 'GET':
|
||||
form.is_admin.data = False # Ensure admin role is unchecked by default
|
||||
elif request.method == 'POST' and 'is_admin' not in request.form:
|
||||
form.is_admin.data = False # Explicitly set to False if not present in POST
|
||||
if form.validate_on_submit():
|
||||
# Check if a user with this email already exists
|
||||
existing_user = User.query.filter_by(email=form.email.data).first()
|
||||
if existing_user:
|
||||
flash('A user with this email already exists.', 'error')
|
||||
return render_template('contacts/form.html', form=form, title='New User', total_admins=total_admins)
|
||||
|
||||
# Handle profile picture upload
|
||||
profile_picture = None
|
||||
file = request.files.get('profile_picture')
|
||||
if file and file.filename:
|
||||
filename = secure_filename(file.filename)
|
||||
file_path = os.path.join(UPLOAD_FOLDER, filename)
|
||||
file.save(file_path)
|
||||
profile_picture = filename
|
||||
|
||||
# Create new user account
|
||||
user = User(
|
||||
username=form.first_name.data,
|
||||
last_name=form.last_name.data,
|
||||
email=form.email.data,
|
||||
phone=form.phone.data,
|
||||
company=form.company.data,
|
||||
position=form.position.data,
|
||||
notes=form.notes.data,
|
||||
is_active=form.is_active.data,
|
||||
is_admin=form.is_admin.data,
|
||||
profile_picture=profile_picture
|
||||
)
|
||||
user.set_password('changeme') # Set a default password that must be changed
|
||||
db.session.add(user)
|
||||
db.session.commit()
|
||||
flash('User created successfully! They will need to set their password on first login.', 'success')
|
||||
return redirect(url_for('contacts.contacts_list'))
|
||||
return render_template('contacts/form.html', form=form, title='New User', total_admins=total_admins)
|
||||
|
||||
@contacts_bp.route('/profile/edit', methods=['GET', 'POST'])
|
||||
@login_required
|
||||
def edit_profile():
|
||||
form = UserForm()
|
||||
total_admins = User.query.filter_by(is_admin=True).count()
|
||||
|
||||
if form.validate_on_submit():
|
||||
# Check if trying to remove admin status from the only admin
|
||||
if not form.is_admin.data and current_user.is_admin:
|
||||
if total_admins <= 1:
|
||||
flash('There must be at least one admin user in the system.', 'error')
|
||||
return render_template('contacts/form.html', form=form, title='Edit Profile', total_admins=total_admins)
|
||||
|
||||
current_user.username = form.first_name.data
|
||||
current_user.last_name = form.last_name.data
|
||||
current_user.email = form.email.data
|
||||
current_user.phone = form.phone.data
|
||||
current_user.company = form.company.data
|
||||
current_user.position = form.position.data
|
||||
current_user.notes = form.notes.data
|
||||
current_user.is_active = form.is_active.data
|
||||
current_user.is_admin = form.is_admin.data
|
||||
# Set password if provided
|
||||
if form.new_password.data:
|
||||
current_user.set_password(form.new_password.data)
|
||||
db.session.commit()
|
||||
flash('Profile updated successfully!', 'success')
|
||||
return redirect(url_for('contacts.contacts_list'))
|
||||
|
||||
# Pre-fill the form with current user data
|
||||
if request.method == 'GET':
|
||||
form.first_name.data = current_user.username
|
||||
form.last_name.data = current_user.last_name
|
||||
form.email.data = current_user.email
|
||||
form.phone.data = current_user.phone
|
||||
form.company.data = current_user.company
|
||||
form.position.data = current_user.position
|
||||
form.notes.data = current_user.notes
|
||||
form.is_active.data = current_user.is_active
|
||||
form.is_admin.data = current_user.is_admin
|
||||
|
||||
return render_template('contacts/form.html', form=form, title='Edit Profile', total_admins=total_admins)
|
||||
|
||||
@contacts_bp.route('/<int:id>/edit', methods=['GET', 'POST'])
|
||||
@login_required
|
||||
def edit_contact(id):
|
||||
result = admin_required()
|
||||
if result: return result
|
||||
total_admins = User.query.filter_by(is_admin=True).count()
|
||||
user = User.query.get_or_404(id)
|
||||
form = UserForm()
|
||||
if request.method == 'GET':
|
||||
form.first_name.data = user.username
|
||||
form.last_name.data = user.last_name
|
||||
form.email.data = user.email
|
||||
form.phone.data = user.phone
|
||||
form.company.data = user.company
|
||||
form.position.data = user.position
|
||||
form.notes.data = user.notes
|
||||
form.is_active.data = user.is_active
|
||||
form.is_admin.data = user.is_admin
|
||||
if form.validate_on_submit():
|
||||
# Handle profile picture removal
|
||||
if 'remove_picture' in request.form:
|
||||
if user.profile_picture:
|
||||
# Delete the old profile picture file
|
||||
old_picture_path = os.path.join(UPLOAD_FOLDER, user.profile_picture)
|
||||
if os.path.exists(old_picture_path):
|
||||
os.remove(old_picture_path)
|
||||
user.profile_picture = None
|
||||
db.session.commit()
|
||||
flash('Profile picture removed successfully!', 'success')
|
||||
return redirect(url_for('contacts.edit_contact', id=user.id))
|
||||
|
||||
# Handle profile picture upload
|
||||
file = request.files.get('profile_picture')
|
||||
if file and file.filename:
|
||||
# Delete old profile picture if it exists
|
||||
if user.profile_picture:
|
||||
old_picture_path = os.path.join(UPLOAD_FOLDER, user.profile_picture)
|
||||
if os.path.exists(old_picture_path):
|
||||
os.remove(old_picture_path)
|
||||
filename = secure_filename(file.filename)
|
||||
file_path = os.path.join(UPLOAD_FOLDER, filename)
|
||||
file.save(file_path)
|
||||
user.profile_picture = filename
|
||||
|
||||
# Prevent removing admin from the last admin
|
||||
if not form.is_admin.data and user.is_admin and total_admins <= 1:
|
||||
flash('There must be at least one admin user in the system.', 'error')
|
||||
return render_template('contacts/form.html', form=form, title='Edit User', total_admins=total_admins, user=user)
|
||||
# Check if the new email is already used by another user
|
||||
if form.email.data != user.email:
|
||||
existing_user = User.query.filter_by(email=form.email.data).first()
|
||||
if existing_user:
|
||||
flash('A user with this email already exists.', 'error')
|
||||
return render_template('contacts/form.html', form=form, title='Edit User', total_admins=total_admins, user=user)
|
||||
user.username = form.first_name.data
|
||||
user.last_name = form.last_name.data
|
||||
user.email = form.email.data
|
||||
user.phone = form.phone.data
|
||||
user.company = form.company.data
|
||||
user.position = form.position.data
|
||||
user.notes = form.notes.data
|
||||
user.is_active = form.is_active.data
|
||||
user.is_admin = form.is_admin.data
|
||||
# Set password if provided
|
||||
if form.new_password.data:
|
||||
user.set_password(form.new_password.data)
|
||||
db.session.commit()
|
||||
flash('User updated successfully!', 'success')
|
||||
return redirect(url_for('contacts.contacts_list'))
|
||||
return render_template('contacts/form.html', form=form, title='Edit User', total_admins=total_admins, user=user)
|
||||
|
||||
@contacts_bp.route('/<int:id>/delete', methods=['POST'])
|
||||
@login_required
|
||||
def delete_contact(id):
|
||||
result = admin_required()
|
||||
if result: return result
|
||||
user = User.query.get_or_404(id)
|
||||
if user.email == current_user.email:
|
||||
flash('You cannot delete your own account.', 'error')
|
||||
return redirect(url_for('contacts.contacts_list'))
|
||||
db.session.delete(user)
|
||||
db.session.commit()
|
||||
flash('User deleted successfully!', 'success')
|
||||
return redirect(url_for('contacts.contacts_list'))
|
||||
|
||||
@contacts_bp.route('/<int:id>/toggle-active', methods=['POST'])
|
||||
@login_required
|
||||
def toggle_active(id):
|
||||
result = admin_required()
|
||||
if result: return result
|
||||
user = User.query.get_or_404(id)
|
||||
if user.email == current_user.email:
|
||||
flash('You cannot deactivate your own account.', 'error')
|
||||
return redirect(url_for('contacts.contacts_list'))
|
||||
user.is_active = not user.is_active
|
||||
db.session.commit()
|
||||
flash(f'User marked as {"active" if user.is_active else "inactive"}!', 'success')
|
||||
return redirect(url_for('contacts.contacts_list'))
|
||||
Reference in New Issue
Block a user