462 lines
19 KiB
Python
462 lines
19 KiB
Python
from flask import render_template, request, flash, redirect, url_for, Blueprint, jsonify
|
|
from flask_login import login_user, logout_user, login_required, current_user
|
|
from models import db, User, Notif, PasswordSetupToken, PasswordResetToken
|
|
from functools import wraps
|
|
from datetime import datetime, timedelta
|
|
from utils import log_event, create_notification, get_unread_count
|
|
from utils.notification import generate_mail_from_notification
|
|
import string
|
|
import secrets
|
|
|
|
auth_bp = Blueprint('auth', __name__)
|
|
|
|
def require_password_change(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if current_user.is_authenticated:
|
|
# Check if user has any valid password setup tokens
|
|
has_valid_token = PasswordSetupToken.query.filter_by(
|
|
user_id=current_user.id,
|
|
used=False
|
|
).filter(PasswordSetupToken.expires_at > datetime.utcnow()).first() is not None
|
|
|
|
if has_valid_token:
|
|
flash('Please set up your password before continuing.', 'warning')
|
|
return redirect(url_for('auth.setup_password', token=current_user.password_setup_tokens[0].token))
|
|
elif current_user.check_password('changeme'):
|
|
flash('Please change your password before continuing.', 'warning')
|
|
return redirect(url_for('auth.change_password'))
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
def init_routes(auth_bp):
|
|
@auth_bp.context_processor
|
|
def inject_unread_notifications():
|
|
if current_user.is_authenticated:
|
|
unread_count = get_unread_count(current_user.id)
|
|
return {'unread_notifications': unread_count}
|
|
return {'unread_notifications': 0}
|
|
|
|
@auth_bp.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for('main.dashboard'))
|
|
|
|
if request.method == 'POST':
|
|
email = request.form.get('email')
|
|
password = request.form.get('password')
|
|
remember = True if request.form.get('remember') else False
|
|
|
|
user = User.query.filter_by(email=email).first()
|
|
|
|
if not user or not user.check_password(password):
|
|
# Log failed login attempt
|
|
log_event(
|
|
event_type='user_login',
|
|
details={
|
|
'email': email,
|
|
'success': False,
|
|
'reason': 'invalid_credentials'
|
|
}
|
|
)
|
|
db.session.commit()
|
|
flash('Please check your login details and try again.', 'danger')
|
|
return redirect(url_for('auth.login'))
|
|
|
|
login_user(user, remember=remember)
|
|
|
|
# Log successful login
|
|
log_event(
|
|
event_type='user_login',
|
|
details={
|
|
'user_id': user.id,
|
|
'user_name': f"{user.username} {user.last_name}",
|
|
'email': user.email,
|
|
'success': True,
|
|
'remember': remember
|
|
}
|
|
)
|
|
db.session.commit()
|
|
|
|
# Check if user is using default password
|
|
if password == 'changeme':
|
|
flash('Please change your password before continuing.', 'warning')
|
|
return redirect(url_for('auth.change_password'))
|
|
|
|
next_page = request.args.get('next')
|
|
if next_page:
|
|
return redirect(next_page)
|
|
return redirect(url_for('main.dashboard'))
|
|
|
|
return render_template('auth/login.html')
|
|
|
|
@auth_bp.route('/register', methods=['GET', 'POST'])
|
|
def register():
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for('main.dashboard'))
|
|
|
|
if request.method == 'POST':
|
|
email = request.form.get('email')
|
|
username = request.form.get('username')
|
|
password = request.form.get('password')
|
|
|
|
user = User.query.filter_by(email=email).first()
|
|
if user:
|
|
flash('Email address already exists', 'danger')
|
|
return redirect(url_for('auth.register'))
|
|
|
|
user = User.query.filter_by(username=username).first()
|
|
if user:
|
|
flash('Username already exists', 'danger')
|
|
return redirect(url_for('auth.register'))
|
|
|
|
new_user = User(email=email, username=username)
|
|
new_user.set_password(password)
|
|
|
|
db.session.add(new_user)
|
|
db.session.commit()
|
|
|
|
# Create notification for the new user
|
|
create_notification(
|
|
notif_type='account_created',
|
|
user_id=new_user.id,
|
|
details={
|
|
'message': 'Welcome to DocuPulse! Your account has been created successfully.',
|
|
'username': new_user.username,
|
|
'email': new_user.email,
|
|
'timestamp': datetime.utcnow().isoformat()
|
|
}
|
|
)
|
|
|
|
# Log successful registration
|
|
log_event(
|
|
event_type='user_create',
|
|
details={
|
|
'user_id': new_user.id,
|
|
'user_name': f"{new_user.username} {new_user.last_name}",
|
|
'email': new_user.email,
|
|
'method': 'web_form'
|
|
}
|
|
)
|
|
db.session.commit()
|
|
|
|
login_user(new_user)
|
|
return redirect(url_for('main.dashboard'))
|
|
|
|
return render_template('auth/register.html')
|
|
|
|
@auth_bp.route('/logout')
|
|
@login_required
|
|
def logout():
|
|
# Log logout event before logging out
|
|
log_event(
|
|
event_type='user_logout',
|
|
details={
|
|
'user_id': current_user.id,
|
|
'user_name': f"{current_user.username} {current_user.last_name}",
|
|
'email': current_user.email
|
|
}
|
|
)
|
|
db.session.commit()
|
|
logout_user()
|
|
return redirect(url_for('auth.login'))
|
|
|
|
@auth_bp.route('/change-password', methods=['GET', 'POST'])
|
|
@login_required
|
|
def change_password():
|
|
if request.method == 'POST':
|
|
current_password = request.form.get('current_password')
|
|
new_password = request.form.get('new_password')
|
|
confirm_password = request.form.get('confirm_password')
|
|
|
|
if not current_user.check_password(current_password):
|
|
# Log failed password change attempt
|
|
log_event(
|
|
event_type='user_update',
|
|
details={
|
|
'user_id': current_user.id,
|
|
'user_name': f"{current_user.username} {current_user.last_name}",
|
|
'update_type': 'password_change',
|
|
'success': False,
|
|
'reason': 'invalid_current_password'
|
|
}
|
|
)
|
|
db.session.commit()
|
|
flash('Current password is incorrect.', 'danger')
|
|
return redirect(url_for('auth.change_password'))
|
|
|
|
if new_password != confirm_password:
|
|
# Log failed password change attempt
|
|
log_event(
|
|
event_type='user_update',
|
|
details={
|
|
'user_id': current_user.id,
|
|
'user_name': f"{current_user.username} {current_user.last_name}",
|
|
'update_type': 'password_change',
|
|
'success': False,
|
|
'reason': 'passwords_dont_match'
|
|
}
|
|
)
|
|
db.session.commit()
|
|
flash('New passwords do not match.', 'danger')
|
|
return redirect(url_for('auth.change_password'))
|
|
|
|
current_user.set_password(new_password)
|
|
|
|
# Create password change notification
|
|
create_notification(
|
|
notif_type='password_changed',
|
|
user_id=current_user.id,
|
|
details={
|
|
'message': 'Your password has been changed successfully.',
|
|
'timestamp': datetime.utcnow().isoformat()
|
|
}
|
|
)
|
|
|
|
# Log successful password change
|
|
log_event(
|
|
event_type='user_update',
|
|
details={
|
|
'user_id': current_user.id,
|
|
'user_name': f"{current_user.username} {current_user.last_name}",
|
|
'update_type': 'password_change',
|
|
'success': True
|
|
}
|
|
)
|
|
db.session.commit()
|
|
|
|
flash('Password changed successfully!', 'success')
|
|
return redirect(url_for('main.dashboard'))
|
|
|
|
return render_template('auth/change_password.html')
|
|
|
|
@auth_bp.route('/setup-password/<token>', methods=['GET', 'POST'])
|
|
def setup_password(token):
|
|
# Find the token
|
|
setup_token = PasswordSetupToken.query.filter_by(token=token).first()
|
|
|
|
if not setup_token or not setup_token.is_valid():
|
|
flash('Invalid or expired password setup link. Please contact your administrator for a new link.', 'error')
|
|
return redirect(url_for('auth.login'))
|
|
|
|
if request.method == 'POST':
|
|
password = request.form.get('password')
|
|
confirm_password = request.form.get('confirm_password')
|
|
|
|
if not password or not confirm_password:
|
|
flash('Please fill in all fields.', 'error')
|
|
return render_template('auth/setup_password.html')
|
|
|
|
if password != confirm_password:
|
|
flash('Passwords do not match.', 'error')
|
|
return render_template('auth/setup_password.html')
|
|
|
|
# Password requirements
|
|
if len(password) < 8:
|
|
flash('Password must be at least 8 characters long.', 'error')
|
|
return render_template('auth/setup_password.html')
|
|
|
|
if not any(c.isupper() for c in password):
|
|
flash('Password must contain at least one uppercase letter.', 'error')
|
|
return render_template('auth/setup_password.html')
|
|
|
|
if not any(c.islower() for c in password):
|
|
flash('Password must contain at least one lowercase letter.', 'error')
|
|
return render_template('auth/setup_password.html')
|
|
|
|
if not any(c.isdigit() for c in password):
|
|
flash('Password must contain at least one number.', 'error')
|
|
return render_template('auth/setup_password.html')
|
|
|
|
if not any(c in string.punctuation for c in password):
|
|
flash('Password must contain at least one special character.', 'error')
|
|
return render_template('auth/setup_password.html')
|
|
|
|
# Update user's password
|
|
user = setup_token.user
|
|
user.set_password(password)
|
|
|
|
# Mark token as used
|
|
setup_token.used = True
|
|
|
|
# Create password change notification
|
|
create_notification(
|
|
notif_type='password_changed',
|
|
user_id=user.id,
|
|
details={
|
|
'message': 'Your password has been set up successfully.',
|
|
'timestamp': datetime.utcnow().isoformat()
|
|
}
|
|
)
|
|
|
|
# Log password setup event
|
|
log_event(
|
|
event_type='user_update',
|
|
user_id=user.id,
|
|
details={
|
|
'user_id': user.id,
|
|
'user_name': f"{user.username} {user.last_name}",
|
|
'update_type': 'password_setup',
|
|
'success': True
|
|
}
|
|
)
|
|
|
|
db.session.commit()
|
|
|
|
# Log the user in and redirect to dashboard
|
|
login_user(user)
|
|
flash('Password set up successfully! Welcome to DocuPulse.', 'success')
|
|
return redirect(url_for('main.dashboard'))
|
|
|
|
return render_template('auth/setup_password.html')
|
|
|
|
@auth_bp.route('/forgot-password', methods=['GET', 'POST'])
|
|
def forgot_password():
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for('main.dashboard'))
|
|
|
|
if request.method == 'POST':
|
|
email = request.form.get('email')
|
|
|
|
if not email:
|
|
flash('Please enter your email address.', 'error')
|
|
return render_template('auth/forgot_password.html')
|
|
|
|
# Check if user exists
|
|
user = User.query.filter_by(email=email).first()
|
|
|
|
if user:
|
|
# Generate a secure token
|
|
token = secrets.token_urlsafe(32)
|
|
|
|
# Create password reset token
|
|
reset_token = PasswordResetToken(
|
|
user_id=user.id,
|
|
token=token,
|
|
expires_at=datetime.utcnow() + timedelta(hours=1), # 1 hour expiration
|
|
ip_address=request.remote_addr
|
|
)
|
|
db.session.add(reset_token)
|
|
|
|
# Create notification for password reset
|
|
notif = create_notification(
|
|
notif_type='password_reset',
|
|
user_id=user.id,
|
|
details={
|
|
'message': 'You requested a password reset. Click the link below to reset your password.',
|
|
'reset_link': url_for('auth.reset_password', token=token, _external=True),
|
|
'expiry_time': (datetime.utcnow() + timedelta(hours=1)).strftime('%Y-%m-%d %H:%M:%S UTC'),
|
|
'ip_address': request.remote_addr,
|
|
'timestamp': datetime.utcnow().isoformat()
|
|
},
|
|
generate_mail=False # Don't auto-generate email, we'll do it manually
|
|
)
|
|
|
|
# Generate and send email manually
|
|
if notif:
|
|
generate_mail_from_notification(notif)
|
|
|
|
# Log the password reset request
|
|
log_event(
|
|
event_type='user_update',
|
|
details={
|
|
'user_id': user.id,
|
|
'user_name': f"{user.username} {user.last_name}",
|
|
'email': user.email,
|
|
'update_type': 'password_reset_request',
|
|
'ip_address': request.remote_addr,
|
|
'success': True
|
|
}
|
|
)
|
|
|
|
db.session.commit()
|
|
|
|
# Always show success message to prevent email enumeration
|
|
flash('If an account with that email exists, a password reset link has been sent to your email address.', 'success')
|
|
return redirect(url_for('auth.login'))
|
|
|
|
return render_template('auth/forgot_password.html')
|
|
|
|
@auth_bp.route('/reset-password/<token>', methods=['GET', 'POST'])
|
|
def reset_password(token):
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for('main.dashboard'))
|
|
|
|
# Find the token
|
|
reset_token = PasswordResetToken.query.filter_by(token=token).first()
|
|
|
|
if not reset_token or not reset_token.is_valid():
|
|
flash('Invalid or expired password reset link. Please request a new password reset.', 'error')
|
|
return redirect(url_for('auth.forgot_password'))
|
|
|
|
if request.method == 'POST':
|
|
password = request.form.get('password')
|
|
confirm_password = request.form.get('confirm_password')
|
|
|
|
if not password or not confirm_password:
|
|
flash('Please fill in all fields.', 'error')
|
|
return render_template('auth/reset_password.html', token=token)
|
|
|
|
if password != confirm_password:
|
|
flash('Passwords do not match.', 'error')
|
|
return render_template('auth/reset_password.html', token=token)
|
|
|
|
# Password requirements
|
|
if len(password) < 8:
|
|
flash('Password must be at least 8 characters long.', 'error')
|
|
return render_template('auth/reset_password.html', token=token)
|
|
|
|
if not any(c.isupper() for c in password):
|
|
flash('Password must contain at least one uppercase letter.', 'error')
|
|
return render_template('auth/reset_password.html', token=token)
|
|
|
|
if not any(c.islower() for c in password):
|
|
flash('Password must contain at least one lowercase letter.', 'error')
|
|
return render_template('auth/reset_password.html', token=token)
|
|
|
|
if not any(c.isdigit() for c in password):
|
|
flash('Password must contain at least one number.', 'error')
|
|
return render_template('auth/reset_password.html', token=token)
|
|
|
|
if not any(c in string.punctuation for c in password):
|
|
flash('Password must contain at least one special character.', 'error')
|
|
return render_template('auth/reset_password.html', token=token)
|
|
|
|
# Update user's password
|
|
user = reset_token.user
|
|
user.set_password(password)
|
|
|
|
# Mark token as used
|
|
reset_token.used = True
|
|
|
|
# Create password change notification
|
|
create_notification(
|
|
notif_type='password_changed',
|
|
user_id=user.id,
|
|
details={
|
|
'message': 'Your password has been reset successfully.',
|
|
'timestamp': datetime.utcnow().isoformat()
|
|
}
|
|
)
|
|
|
|
# Log password reset event
|
|
log_event(
|
|
event_type='user_update',
|
|
details={
|
|
'user_id': user.id,
|
|
'user_name': f"{user.username} {user.last_name}",
|
|
'email': user.email,
|
|
'update_type': 'password_reset',
|
|
'ip_address': request.remote_addr,
|
|
'success': True
|
|
}
|
|
)
|
|
|
|
db.session.commit()
|
|
|
|
# Log the user in and redirect to dashboard
|
|
login_user(user)
|
|
flash('Password reset successfully! Welcome back to DocuPulse.', 'success')
|
|
return redirect(url_for('main.dashboard'))
|
|
|
|
return render_template('auth/reset_password.html', token=token) |