from flask import render_template, request, flash, redirect, url_for, Blueprint, jsonify from flask_login import login_user, logout_user, login_required, current_user from models import db, User, Notif, PasswordSetupToken, PasswordResetToken from functools import wraps from datetime import datetime, timedelta from utils import log_event, create_notification, get_unread_count from utils.notification import generate_mail_from_notification import string import secrets auth_bp = Blueprint('auth', __name__) def require_password_change(f): @wraps(f) def decorated_function(*args, **kwargs): if current_user.is_authenticated: # Check if user has any valid password setup tokens has_valid_token = PasswordSetupToken.query.filter_by( user_id=current_user.id, used=False ).filter(PasswordSetupToken.expires_at > datetime.utcnow()).first() is not None if has_valid_token: flash('Please set up your password before continuing.', 'warning') return redirect(url_for('auth.setup_password', token=current_user.password_setup_tokens[0].token)) elif current_user.check_password('changeme'): flash('Please change your password before continuing.', 'warning') return redirect(url_for('auth.change_password')) return f(*args, **kwargs) return decorated_function def init_routes(auth_bp): @auth_bp.context_processor def inject_unread_notifications(): if current_user.is_authenticated: unread_count = get_unread_count(current_user.id) return {'unread_notifications': unread_count} return {'unread_notifications': 0} @auth_bp.route('/login', methods=['GET', 'POST']) def login(): if current_user.is_authenticated: return redirect(url_for('main.dashboard')) if request.method == 'POST': email = request.form.get('email') password = request.form.get('password') remember = True if request.form.get('remember') else False user = User.query.filter_by(email=email).first() if not user or not user.check_password(password): # Log failed login attempt log_event( event_type='user_login', details={ 'email': email, 'success': False, 'reason': 'invalid_credentials' } ) db.session.commit() flash('Please check your login details and try again.', 'danger') return redirect(url_for('auth.login')) login_user(user, remember=remember) # Log successful login log_event( event_type='user_login', details={ 'user_id': user.id, 'user_name': f"{user.username} {user.last_name}", 'email': user.email, 'success': True, 'remember': remember } ) db.session.commit() # Check if user is using default password if password == 'changeme': flash('Please change your password before continuing.', 'warning') return redirect(url_for('auth.change_password')) next_page = request.args.get('next') if next_page: return redirect(next_page) return redirect(url_for('main.dashboard')) return render_template('auth/login.html') @auth_bp.route('/register', methods=['GET', 'POST']) def register(): if current_user.is_authenticated: return redirect(url_for('main.dashboard')) if request.method == 'POST': email = request.form.get('email') username = request.form.get('username') password = request.form.get('password') user = User.query.filter_by(email=email).first() if user: flash('Email address already exists', 'danger') return redirect(url_for('auth.register')) user = User.query.filter_by(username=username).first() if user: flash('Username already exists', 'danger') return redirect(url_for('auth.register')) new_user = User(email=email, username=username) new_user.set_password(password) db.session.add(new_user) db.session.commit() # Create notification for the new user create_notification( notif_type='account_created', user_id=new_user.id, details={ 'message': 'Welcome to DocuPulse! Your account has been created successfully.', 'username': new_user.username, 'email': new_user.email, 'timestamp': datetime.utcnow().isoformat() } ) # Log successful registration log_event( event_type='user_create', details={ 'user_id': new_user.id, 'user_name': f"{new_user.username} {new_user.last_name}", 'email': new_user.email, 'method': 'web_form' } ) db.session.commit() login_user(new_user) return redirect(url_for('main.dashboard')) return render_template('auth/register.html') @auth_bp.route('/logout') @login_required def logout(): # Log logout event before logging out log_event( event_type='user_logout', details={ 'user_id': current_user.id, 'user_name': f"{current_user.username} {current_user.last_name}", 'email': current_user.email } ) db.session.commit() logout_user() return redirect(url_for('auth.login')) @auth_bp.route('/change-password', methods=['GET', 'POST']) @login_required def change_password(): if request.method == 'POST': current_password = request.form.get('current_password') new_password = request.form.get('new_password') confirm_password = request.form.get('confirm_password') if not current_user.check_password(current_password): # Log failed password change attempt log_event( event_type='user_update', details={ 'user_id': current_user.id, 'user_name': f"{current_user.username} {current_user.last_name}", 'update_type': 'password_change', 'success': False, 'reason': 'invalid_current_password' } ) db.session.commit() flash('Current password is incorrect.', 'danger') return redirect(url_for('auth.change_password')) if new_password != confirm_password: # Log failed password change attempt log_event( event_type='user_update', details={ 'user_id': current_user.id, 'user_name': f"{current_user.username} {current_user.last_name}", 'update_type': 'password_change', 'success': False, 'reason': 'passwords_dont_match' } ) db.session.commit() flash('New passwords do not match.', 'danger') return redirect(url_for('auth.change_password')) current_user.set_password(new_password) # Create password change notification create_notification( notif_type='password_changed', user_id=current_user.id, details={ 'message': 'Your password has been changed successfully.', 'timestamp': datetime.utcnow().isoformat() } ) # Log successful password change log_event( event_type='user_update', details={ 'user_id': current_user.id, 'user_name': f"{current_user.username} {current_user.last_name}", 'update_type': 'password_change', 'success': True } ) db.session.commit() flash('Password changed successfully!', 'success') return redirect(url_for('main.dashboard')) return render_template('auth/change_password.html') @auth_bp.route('/setup-password/', methods=['GET', 'POST']) def setup_password(token): # Find the token setup_token = PasswordSetupToken.query.filter_by(token=token).first() if not setup_token or not setup_token.is_valid(): flash('Invalid or expired password setup link. Please contact your administrator for a new link.', 'error') return redirect(url_for('auth.login')) if request.method == 'POST': password = request.form.get('password') confirm_password = request.form.get('confirm_password') if not password or not confirm_password: flash('Please fill in all fields.', 'error') return render_template('auth/setup_password.html') if password != confirm_password: flash('Passwords do not match.', 'error') return render_template('auth/setup_password.html') # Password requirements if len(password) < 8: flash('Password must be at least 8 characters long.', 'error') return render_template('auth/setup_password.html') if not any(c.isupper() for c in password): flash('Password must contain at least one uppercase letter.', 'error') return render_template('auth/setup_password.html') if not any(c.islower() for c in password): flash('Password must contain at least one lowercase letter.', 'error') return render_template('auth/setup_password.html') if not any(c.isdigit() for c in password): flash('Password must contain at least one number.', 'error') return render_template('auth/setup_password.html') if not any(c in string.punctuation for c in password): flash('Password must contain at least one special character.', 'error') return render_template('auth/setup_password.html') # Update user's password user = setup_token.user user.set_password(password) # Mark token as used setup_token.used = True # Create password change notification create_notification( notif_type='password_changed', user_id=user.id, details={ 'message': 'Your password has been set up successfully.', 'timestamp': datetime.utcnow().isoformat() } ) # Log password setup event log_event( event_type='user_update', user_id=user.id, details={ 'user_id': user.id, 'user_name': f"{user.username} {user.last_name}", 'update_type': 'password_setup', 'success': True } ) db.session.commit() # Log the user in and redirect to dashboard login_user(user) flash('Password set up successfully! Welcome to DocuPulse.', 'success') return redirect(url_for('main.dashboard')) return render_template('auth/setup_password.html') @auth_bp.route('/forgot-password', methods=['GET', 'POST']) def forgot_password(): if current_user.is_authenticated: return redirect(url_for('main.dashboard')) if request.method == 'POST': email = request.form.get('email') if not email: flash('Please enter your email address.', 'error') return render_template('auth/forgot_password.html') # Check if user exists user = User.query.filter_by(email=email).first() if user: # Generate a secure token token = secrets.token_urlsafe(32) # Create password reset token reset_token = PasswordResetToken( user_id=user.id, token=token, expires_at=datetime.utcnow() + timedelta(hours=1), # 1 hour expiration ip_address=request.remote_addr ) db.session.add(reset_token) # Create notification for password reset notif = create_notification( notif_type='password_reset', user_id=user.id, details={ 'message': 'You requested a password reset. Click the link below to reset your password.', 'reset_link': url_for('auth.reset_password', token=token, _external=True), 'expiry_time': (datetime.utcnow() + timedelta(hours=1)).strftime('%Y-%m-%d %H:%M:%S UTC'), 'ip_address': request.remote_addr, 'timestamp': datetime.utcnow().isoformat() }, generate_mail=False # Don't auto-generate email, we'll do it manually ) # Generate and send email manually if notif: generate_mail_from_notification(notif) # Log the password reset request log_event( event_type='user_update', details={ 'user_id': user.id, 'user_name': f"{user.username} {user.last_name}", 'email': user.email, 'update_type': 'password_reset_request', 'ip_address': request.remote_addr, 'success': True } ) db.session.commit() # Always show success message to prevent email enumeration flash('If an account with that email exists, a password reset link has been sent to your email address.', 'success') return redirect(url_for('auth.login')) return render_template('auth/forgot_password.html') @auth_bp.route('/reset-password/', methods=['GET', 'POST']) def reset_password(token): if current_user.is_authenticated: return redirect(url_for('main.dashboard')) # Find the token reset_token = PasswordResetToken.query.filter_by(token=token).first() if not reset_token or not reset_token.is_valid(): flash('Invalid or expired password reset link. Please request a new password reset.', 'error') return redirect(url_for('auth.forgot_password')) if request.method == 'POST': password = request.form.get('password') confirm_password = request.form.get('confirm_password') if not password or not confirm_password: flash('Please fill in all fields.', 'error') return render_template('auth/reset_password.html', token=token) if password != confirm_password: flash('Passwords do not match.', 'error') return render_template('auth/reset_password.html', token=token) # Password requirements if len(password) < 8: flash('Password must be at least 8 characters long.', 'error') return render_template('auth/reset_password.html', token=token) if not any(c.isupper() for c in password): flash('Password must contain at least one uppercase letter.', 'error') return render_template('auth/reset_password.html', token=token) if not any(c.islower() for c in password): flash('Password must contain at least one lowercase letter.', 'error') return render_template('auth/reset_password.html', token=token) if not any(c.isdigit() for c in password): flash('Password must contain at least one number.', 'error') return render_template('auth/reset_password.html', token=token) if not any(c in string.punctuation for c in password): flash('Password must contain at least one special character.', 'error') return render_template('auth/reset_password.html', token=token) # Update user's password user = reset_token.user user.set_password(password) # Mark token as used reset_token.used = True # Create password change notification create_notification( notif_type='password_changed', user_id=user.id, details={ 'message': 'Your password has been reset successfully.', 'timestamp': datetime.utcnow().isoformat() } ) # Log password reset event log_event( event_type='user_update', details={ 'user_id': user.id, 'user_name': f"{user.username} {user.last_name}", 'email': user.email, 'update_type': 'password_reset', 'ip_address': request.remote_addr, 'success': True } ) db.session.commit() # Log the user in and redirect to dashboard login_user(user) flash('Password reset successfully! Welcome back to DocuPulse.', 'success') return redirect(url_for('main.dashboard')) return render_template('auth/reset_password.html', token=token)