added manager user type

This commit is contained in:
2025-06-05 14:43:06 +02:00
parent 164e8373a4
commit 33f6e0386b
24 changed files with 226 additions and 128 deletions

View File

@@ -29,8 +29,8 @@ def inject_unread_notifications():
def admin_required():
if not current_user.is_authenticated:
return redirect(url_for('auth.login'))
if not current_user.is_admin:
flash('You must be an admin to access this page.', 'error')
if not (current_user.is_admin or current_user.is_manager):
flash('You must be an admin or manager to access this page.', 'error')
return redirect(url_for('main.dashboard'))
@contacts_bp.route('/')
@@ -72,8 +72,10 @@ def contacts_list():
# Apply role filter
if role == 'admin':
query = query.filter(User.is_admin == True)
elif role == 'manager':
query = query.filter(User.is_manager == True)
elif role == 'user':
query = query.filter(User.is_admin == False)
query = query.filter(User.is_admin == False, User.is_manager == False)
# Order by creation date
query = query.order_by(User.created_at.desc())
@@ -97,8 +99,13 @@ def new_contact():
total_admins = User.query.filter_by(is_admin=True).count()
if request.method == 'GET':
form.is_admin.data = False # Ensure admin role is unchecked by default
elif request.method == 'POST' and 'is_admin' not in request.form:
form.is_admin.data = False # Explicitly set to False if not present in POST
form.is_manager.data = False # Ensure manager role is unchecked by default
elif request.method == 'POST':
if 'is_admin' not in request.form:
form.is_admin.data = False
if 'is_manager' not in request.form:
form.is_manager.data = False
if form.validate_on_submit():
# Check if a user with this email already exists
existing_user = User.query.filter_by(email=form.email.data).first()
@@ -130,9 +137,10 @@ def new_contact():
notes=form.notes.data,
is_active=True, # Set default value
is_admin=form.is_admin.data,
is_manager=form.is_manager.data,
profile_picture=profile_picture
)
user.set_password(random_password) # Set random password
user.set_password(random_password)
db.session.add(user)
db.session.commit()
@@ -171,6 +179,7 @@ def new_contact():
'user_name': f"{user.username} {user.last_name}",
'email': user.email,
'is_admin': user.is_admin,
'is_manager': user.is_manager,
'method': 'admin_creation'
}
)

View File

@@ -61,8 +61,8 @@ def conversations():
@login_required
@require_password_change
def create_conversation():
if not current_user.is_admin:
flash('Only administrators can create conversations.', 'error')
if not (current_user.is_admin or current_user.is_manager):
flash('Only administrators and managers can create conversations.', 'error')
return redirect(url_for('conversations.conversations'))
form = ConversationForm()
@@ -148,8 +148,8 @@ def conversation(conversation_id):
# Query messages directly using the Message model
messages = Message.query.filter_by(conversation_id=conversation_id).order_by(Message.created_at.asc()).all()
# Get all users for member selection (only needed for admin)
all_users = User.query.all() if current_user.is_admin else None
# Get all users for member selection (needed for admin and manager)
all_users = User.query.all() if (current_user.is_admin or current_user.is_manager) else None
unread_count = get_unread_count(current_user.id)
return render_template('conversations/conversation.html',
@@ -167,8 +167,8 @@ def conversation_members(conversation_id):
flash('You do not have access to this conversation.', 'error')
return redirect(url_for('conversations.conversations'))
if not current_user.is_admin:
flash('Only administrators can manage conversation members.', 'error')
if not (current_user.is_admin or current_user.is_manager):
flash('Only administrators and managers can manage conversation members.', 'error')
return redirect(url_for('conversations.conversation', conversation_id=conversation_id))
available_users = User.query.filter(~User.id.in_([m.id for m in conversation.members])).all()

View File

@@ -273,11 +273,36 @@ def init_routes(main_bp):
).group_by('extension').all()
# Get conversation stats
conversation_count = Conversation.query.count()
message_count = Message.query.count()
attachment_count = MessageAttachment.query.count()
conversation_total_size = db.session.query(func.sum(MessageAttachment.size)).scalar() or 0
recent_conversations = Conversation.query.order_by(Conversation.created_at.desc()).limit(5).all()
if current_user.is_admin:
conversation_count = Conversation.query.count()
message_count = Message.query.count()
attachment_count = MessageAttachment.query.count()
conversation_total_size = db.session.query(func.sum(MessageAttachment.size)).scalar() or 0
recent_conversations = Conversation.query.order_by(Conversation.created_at.desc()).limit(5).all()
else:
# Get conversations where user is a member
user_conversations = Conversation.query.filter(Conversation.members.any(id=current_user.id)).all()
conversation_count = len(user_conversations)
# Get message count for user's conversations
conversation_ids = [conv.id for conv in user_conversations]
message_count = Message.query.filter(Message.conversation_id.in_(conversation_ids)).count()
# Get attachment count and size for user's conversations
attachment_stats = db.session.query(
func.count(MessageAttachment.id).label('count'),
func.sum(MessageAttachment.size).label('total_size')
).filter(MessageAttachment.message_id.in_(
db.session.query(Message.id).filter(Message.conversation_id.in_(conversation_ids))
)).first()
attachment_count = attachment_stats.count or 0
conversation_total_size = attachment_stats.total_size or 0
# Get recent conversations for the user
recent_conversations = Conversation.query.filter(
Conversation.members.any(id=current_user.id)
).order_by(Conversation.created_at.desc()).limit(5).all()
return render_template('dashboard/dashboard.html',
room_count=room_count,

View File

@@ -85,7 +85,7 @@ def create_room():
@require_password_change
def room(room_id):
room = Room.query.get_or_404(room_id)
# Admins always have access
# Admins always have access, managers need to be members
if not current_user.is_admin:
is_member = RoomMemberPermission.query.filter_by(room_id=room_id, user_id=current_user.id).first() is not None
if not is_member:
@@ -116,14 +116,15 @@ def room(room_id):
@require_password_change
def room_members(room_id):
room = Room.query.get_or_404(room_id)
# Admins always have access
# Check if user is a member
if not current_user.is_admin:
is_member = RoomMemberPermission.query.filter_by(room_id=room_id, user_id=current_user.id).first() is not None
if not is_member:
flash('You do not have access to this room.', 'error')
return redirect(url_for('rooms.rooms'))
if not current_user.is_admin:
flash('Only administrators can manage room members.', 'error')
# Only admins and managers who are members can manage room members
if not (current_user.is_admin or (current_user.is_manager and is_member)):
flash('Only administrators and managers can manage room members.', 'error')
return redirect(url_for('rooms.room', room_id=room_id))
member_permissions = {p.user_id: p for p in room.member_permissions}
available_users = User.query.filter(~User.id.in_(member_permissions.keys())).all()
@@ -139,8 +140,9 @@ def add_member(room_id):
if not is_member:
flash('You do not have access to this room.', 'error')
return redirect(url_for('rooms.rooms'))
if not current_user.is_admin:
flash('Only administrators can manage room members.', 'error')
# Only admins and managers who are members can manage room members
if not (current_user.is_admin or (current_user.is_manager and is_member)):
flash('Only administrators and managers can manage room members.', 'error')
return redirect(url_for('rooms.room', room_id=room_id))
user_id = request.form.get('user_id')
if not user_id:
@@ -211,59 +213,30 @@ def remove_member(room_id, user_id):
if not is_member:
flash('You do not have access to this room.', 'error')
return redirect(url_for('rooms.rooms'))
if not current_user.is_admin:
flash('Only administrators can manage room members.', 'error')
# Only admins and managers who are members can manage room members
if not (current_user.is_admin or (current_user.is_manager and is_member)):
flash('Only administrators and managers can manage room members.', 'error')
return redirect(url_for('rooms.room', room_id=room_id))
if user_id == room.created_by:
flash('Cannot remove the room creator.', 'error')
else:
perm = RoomMemberPermission.query.filter_by(room_id=room_id, user_id=user_id).first()
if not perm:
flash('User is not a member of this room.', 'error')
if perm:
db.session.delete(perm)
db.session.commit()
flash('Member has been removed from the room.', 'success')
else:
user = User.query.get(user_id)
try:
# Create notification for the removed user
create_notification(
notif_type='room_invite_removed',
user_id=user_id,
sender_id=current_user.id,
details={
'message': f'You have been removed from room "{room.name}"',
'room_id': room_id,
'room_name': room.name,
'removed_by': f"{current_user.username} {current_user.last_name}",
'timestamp': datetime.utcnow().isoformat()
}
)
log_event(
event_type='room_member_remove',
details={
'room_id': room_id,
'room_name': room.name,
'removed_user': f"{user.username} {user.last_name}",
'removed_by': f"{current_user.username} {current_user.last_name}"
},
user_id=current_user.id
)
db.session.delete(perm)
db.session.commit()
flash('User has been removed from the room.', 'success')
except Exception as e:
db.session.rollback()
flash('An error occurred while removing the member.', 'error')
print(f"Error removing member: {str(e)}")
flash('Member not found.', 'error')
return redirect(url_for('rooms.room_members', room_id=room_id))
@rooms_bp.route('/<int:room_id>/members/<int:user_id>/permissions', methods=['POST'])
@login_required
def update_member_permissions(room_id, user_id):
room = Room.query.get_or_404(room_id)
if not current_user.is_admin:
flash('Only administrators can update permissions.', 'error')
# Check if user is a member
is_member = RoomMemberPermission.query.filter_by(room_id=room_id, user_id=current_user.id).first() is not None
if not (current_user.is_admin or (current_user.is_manager and is_member)):
flash('Only administrators and managers can update permissions.', 'error')
return redirect(url_for('rooms.room_members', room_id=room_id))
perm = RoomMemberPermission.query.filter_by(room_id=room_id, user_id=user_id).first()
if not perm:
@@ -312,11 +285,13 @@ def update_member_permissions(room_id, user_id):
@rooms_bp.route('/<int:room_id>/edit', methods=['GET', 'POST'])
@login_required
def edit_room(room_id):
if not current_user.is_admin:
flash('Only administrators can edit rooms.', 'error')
room = Room.query.get_or_404(room_id)
# Check if user is a member
is_member = RoomMemberPermission.query.filter_by(room_id=room_id, user_id=current_user.id).first() is not None
if not (current_user.is_admin or (current_user.is_manager and is_member)):
flash('Only administrators and managers can edit rooms.', 'error')
return redirect(url_for('rooms.rooms'))
room = Room.query.get_or_404(room_id)
form = RoomForm()
if form.validate_on_submit():
@@ -354,11 +329,13 @@ def edit_room(room_id):
@rooms_bp.route('/<int:room_id>/delete', methods=['POST'])
@login_required
def delete_room(room_id):
if not current_user.is_admin:
flash('Only administrators can delete rooms.', 'error')
room = Room.query.get_or_404(room_id)
# Check if user is a member
is_member = RoomMemberPermission.query.filter_by(room_id=room_id, user_id=current_user.id).first() is not None
if not (current_user.is_admin or (current_user.is_manager and is_member)):
flash('Only administrators and managers can delete rooms.', 'error')
return redirect(url_for('rooms.rooms'))
room = Room.query.get_or_404(room_id)
room_name = room.name
try: