""" Room Files Management Module This module provides a Flask Blueprint for managing files within rooms in the DocuPulse application. It handles file operations such as uploading, downloading, moving, deleting, and organizing files within room contexts. The module implements permission-based access control and supports various file operations including file organization, starring, and trash management. Key Features: - File upload and download with permission checks - Folder creation and management - File operations (move, rename, delete) - File starring system - Trash management with restore capability - Bulk operations (zip download) - File search functionality The module uses a combination of database records (RoomFile model) and actual file system storage to maintain file metadata and content. """ from flask import Blueprint, jsonify, request, abort, send_from_directory, send_file from flask_login import login_required, current_user import os from models import Room, RoomMemberPermission, RoomFile, TrashedFile, db, User, Notif from werkzeug.utils import secure_filename, safe_join import time import shutil import io import zipfile from datetime import datetime from utils import log_event, create_notification, get_unread_count # Blueprint for room file operations room_files_bp = Blueprint('room_files', __name__, url_prefix='/api/rooms') # Root directory for storing room files DATA_ROOT = '/app/uploads/rooms' # Updated to match Docker volume mount point # Set of allowed file extensions for upload ALLOWED_EXTENSIONS = { # Documents 'pdf', 'docx', 'doc', 'txt', 'rtf', 'odt', 'md', 'csv', # Spreadsheets 'xlsx', 'xls', 'ods', 'xlsm', # Presentations 'pptx', 'ppt', 'odp', # Images 'jpg', 'jpeg', 'png', 'gif', 'bmp', 'svg', 'webp', 'tiff', # Archives 'zip', 'rar', '7z', 'tar', 'gz', # Code/Text 'py', 'js', 'html', 'css', 'json', 'xml', 'sql', 'sh', 'bat', # Audio 'mp3', 'wav', 'ogg', 'm4a', 'flac', # Video 'mp4', 'avi', 'mov', 'wmv', 'flv', 'mkv', 'webm', # CAD/Design 'dwg', 'dxf', 'ai', 'psd', 'eps', 'indd', # Other 'eml', 'msg', 'vcf', 'ics' } @room_files_bp.context_processor def inject_unread_notifications(): if current_user.is_authenticated: unread_count = get_unread_count(current_user.id) return {'unread_notifications': unread_count} return {'unread_notifications': 0} def get_room_dir(room_id): """ Get the absolute path to a room's directory. Args: room_id (int): The ID of the room Returns: str: Absolute path to the room's directory """ return os.path.join(DATA_ROOT, str(room_id)) def user_has_permission(room, perm_name): """ Check if the current user has a specific permission in a room. Args: room (Room): The room object to check permissions for perm_name (str): Name of the permission to check Returns: bool: True if user has permission, False otherwise """ if current_user.is_admin: return True perm = RoomMemberPermission.query.filter_by(room_id=room.id, user_id=current_user.id).first() return getattr(perm, perm_name, False) if perm else False def allowed_file(filename): """ Check if a file's extension is in the allowed extensions list. Args: filename (str): Name of the file to check Returns: bool: True if file extension is allowed, False otherwise """ return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def clean_path(path): """ Clean a path string by removing leading/trailing slashes. Args: path (str): Path string to clean Returns: str: Cleaned path string """ if not path: return '' return path.strip('/\\') @room_files_bp.route('//files', methods=['GET']) @login_required def list_room_files(room_id): """ List all files in a room's directory. Args: room_id (int): ID of the room to list files from Returns: JSON response containing list of files with their metadata """ room = Room.query.get_or_404(room_id) if not user_has_permission(room, 'can_view'): abort(403) path = request.args.get('path', '') path = clean_path(path) # Get files in the current path files = RoomFile.query.filter_by(room_id=room_id, path=path, deleted=False).all() # Debug: Check user permissions if not current_user.is_admin: perm = RoomMemberPermission.query.filter_by(room_id=room.id, user_id=current_user.id).first() print("=== User Permissions ===") print(f" - can_view: {getattr(perm, 'can_view', False) if perm else False}") print(f" - can_download: {getattr(perm, 'can_download', False) if perm else False}") print(f" - can_upload: {getattr(perm, 'can_upload', False) if perm else False}") print(f" - can_delete: {getattr(perm, 'can_delete', False) if perm else False}") print(f" - can_rename: {getattr(perm, 'can_rename', False) if perm else False}") print(f" - can_move: {getattr(perm, 'can_move', False) if perm else False}") print(f" - can_share: {getattr(perm, 'can_share', False) if perm else False}") print("-------------------") result = [] for f in files: uploader_full_name = None uploader_profile_pic = None if f.uploader: uploader_full_name = f.uploader.username if getattr(f.uploader, 'last_name', None): uploader_full_name += ' ' + f.uploader.last_name uploader_profile_pic = f.uploader.profile_picture if getattr(f.uploader, 'profile_picture', None) else None result.append({ 'name': f.name, 'type': f.type, 'size': f.size if f.type == 'file' else '-', 'modified': f.modified, 'uploaded_by': uploader_full_name, 'uploader_profile_pic': uploader_profile_pic, 'uploaded_at': f.uploaded_at.isoformat() if f.uploaded_at else None, 'path': f.path, 'starred': current_user in f.starred_by }) print(f"Returning {len(result)} files") # Debug log return jsonify(result) @room_files_bp.route('//files/upload', methods=['POST']) @login_required def upload_room_file(room_id): room = Room.query.get_or_404(room_id) if not user_has_permission(room, 'can_upload'): abort(403) if 'file' not in request.files: return jsonify({'error': 'No file part'}), 400 file = request.files['file'] if file.filename == '': return jsonify({'error': 'No selected file'}), 400 if not allowed_file(file.filename): return jsonify({'error': 'File type not allowed'}), 400 filename = secure_filename(file.filename) room_dir = get_room_dir(room_id) rel_path = clean_path(request.form.get('path', '')) target_dir = os.path.join(room_dir, rel_path) if rel_path else room_dir os.makedirs(target_dir, exist_ok=True) file_path = os.path.join(target_dir, filename) # Check for overwrite flag overwrite = request.form.get('overwrite', 'false').lower() == 'true' # First check for non-deleted files existing_file = RoomFile.query.filter_by(room_id=room_id, name=filename, path=rel_path, deleted=False).first() if existing_file and not overwrite: return jsonify({'error': 'A file with this name already exists in this location', 'conflict': True}), 409 # Then check for deleted files trashed_file = RoomFile.query.filter_by(room_id=room_id, name=filename, path=rel_path, deleted=True).first() if trashed_file: # If we're not overwriting, return conflict if not overwrite: return jsonify({'error': 'A file with this name exists in the trash', 'conflict': True}), 409 # If we are overwriting, delete the trashed file record db.session.delete(trashed_file) existing_file = None file.save(file_path) stat = os.stat(file_path) if existing_file: # Overwrite: update the RoomFile record existing_file.size = stat.st_size existing_file.modified = stat.st_mtime existing_file.uploaded_by = current_user.id existing_file.uploaded_at = datetime.utcnow() db.session.commit() log_event( event_type='file_upload', details={ 'uploaded_by': f"{current_user.username} {current_user.last_name}", 'filename': filename, 'room_id': room_id, 'path': rel_path, 'size': stat.st_size, 'overwritten': True }, user_id=current_user.id ) db.session.commit() return jsonify({'success': True, 'filename': filename, 'overwritten': True}) else: rf = RoomFile( room_id=room_id, name=filename, path=rel_path, type='file', size=stat.st_size, modified=stat.st_mtime, uploaded_by=current_user.id, uploaded_at=datetime.utcnow() ) db.session.add(rf) db.session.commit() log_event( event_type='file_upload', details={ 'uploaded_by': f"{current_user.username} {current_user.last_name}", 'filename': filename, 'room_id': room_id, 'path': rel_path, 'size': stat.st_size, 'overwritten': False }, user_id=current_user.id ) db.session.commit() return jsonify({'success': True, 'filename': filename}) @room_files_bp.route('//files/', methods=['GET']) @login_required def download_room_file(room_id, filename): """ Download or preview a file from a room. Args: room_id (int): ID of the room containing the file filename (str): Name of the file to download/preview Returns: File download/preview response or error message """ room = Room.query.get_or_404(room_id) if not user_has_permission(room, 'can_download'): abort(403) rel_path = clean_path(request.args.get('path', '')) preview_mode = request.args.get('preview', 'false').lower() == 'true' # Lookup in RoomFile rf = RoomFile.query.filter_by(room_id=room_id, name=filename, path=rel_path).first() if not rf or rf.type != 'file': return jsonify({'error': 'File not found'}), 404 room_dir = get_room_dir(room_id) file_path = os.path.join(room_dir, rel_path, filename) if rel_path else os.path.join(room_dir, filename) if not os.path.exists(file_path): return jsonify({'error': 'File not found'}), 404 # Log the event log_event( event_type='file_download' if not preview_mode else 'file_preview', details={ 'user': f"{current_user.username} {current_user.last_name}", 'filename': filename, 'room_id': room_id, 'path': rel_path, 'size': rf.size if rf else None, 'preview': preview_mode }, user_id=current_user.id ) db.session.commit() # For preview mode, we don't set as_attachment return send_from_directory( os.path.dirname(file_path), filename, as_attachment=not preview_mode ) @room_files_bp.route('//files/', methods=['DELETE']) @login_required def delete_file(room_id, filename): """ Delete a file from a room (moves to trash). Args: room_id (int): ID of the room containing the file filename (str): Name of the file to delete Returns: JSON response indicating success or error """ room = Room.query.get_or_404(room_id) if not user_has_permission(room, 'can_delete'): abort(403) rel_path = clean_path(request.args.get('path', '')) # Lookup in RoomFile rf = RoomFile.query.filter_by(room_id=room_id, name=filename, path=rel_path).first() if not rf: return jsonify({'error': 'File not found'}), 404 # If it's a folder, mark all contained items as deleted if rf.type == 'folder': folder_path = os.path.join(rf.path, rf.name) if rf.path else rf.name contained_items = RoomFile.query.filter( RoomFile.room_id == room_id, RoomFile.path.like(f"{folder_path}%") ).all() for item in contained_items: item.deleted = True item.deleted_by = current_user.id item.deleted_at = datetime.utcnow() # Mark as deleted and record who deleted it and when rf.deleted = True rf.deleted_by = current_user.id rf.deleted_at = datetime.utcnow() db.session.commit() log_event( event_type='file_delete', details={ 'deleted_by': f"{current_user.username} {current_user.last_name}", 'filename': filename, 'room_id': room_id, 'path': rel_path, 'type': rf.type, 'size': rf.size if rf.type == 'file' else None }, user_id=current_user.id ) db.session.commit() return jsonify({'success': True}) @room_files_bp.route('//folders', methods=['POST']) @login_required def create_room_folder(room_id): """ Create a new folder in a room. Args: room_id (int): ID of the room to create folder in Returns: JSON response indicating success or error """ room = Room.query.get_or_404(room_id) if not user_has_permission(room, 'can_upload'): abort(403) data = request.get_json() folder_name = data.get('name', '').strip() rel_path = clean_path(data.get('path', '')) if not folder_name or '/' in folder_name or '\\' in folder_name or folder_name.startswith('.'): return jsonify({'error': 'Invalid folder name'}), 400 room_dir = get_room_dir(room_id) target_dir = os.path.join(room_dir, rel_path) if rel_path else room_dir os.makedirs(target_dir, exist_ok=True) folder_path = os.path.join(target_dir, folder_name) # First check for trashed folder trashed_folder = RoomFile.query.filter_by( room_id=room_id, name=folder_name, path=rel_path, type='folder', deleted=True ).first() if trashed_folder: return jsonify({'error': 'A folder with this name exists in the trash'}), 409 # Then check for existing folder in current location existing_folder = RoomFile.query.filter_by( room_id=room_id, name=folder_name, path=rel_path, type='folder', deleted=False ).first() if existing_folder: return jsonify({'error': 'A folder with this name already exists in this location'}), 400 if os.path.exists(folder_path): return jsonify({'error': 'A folder with this name already exists in this location'}), 400 os.makedirs(folder_path) # Add RoomFile entry stat = os.stat(folder_path) rf = RoomFile( room_id=room_id, name=folder_name, path=rel_path, type='folder', size=None, modified=stat.st_mtime, uploaded_by=current_user.id, uploaded_at=datetime.utcnow() ) db.session.add(rf) db.session.commit() log_event( event_type='folder_create', details={ 'created_by': f"{current_user.username} {current_user.last_name}", 'folder_name': folder_name, 'room_id': room_id, 'path': rel_path }, user_id=current_user.id ) db.session.commit() return jsonify({'success': True, 'name': folder_name}) @room_files_bp.route('//rename', methods=['POST']) @login_required def rename_room_file(room_id): """ Rename a file or folder in a room. Args: room_id (int): ID of the room containing the file/folder Returns: JSON response indicating success or error """ room = Room.query.get_or_404(room_id) # Allow rename if user can upload or delete if not (user_has_permission(room, 'can_upload') or user_has_permission(room, 'can_delete')): abort(403) data = request.get_json() old_name = data.get('old_name', '').strip() new_name = data.get('new_name', '').strip() rel_path = clean_path(data.get('path', '')) if not old_name or not new_name or '/' in new_name or '\\' in new_name or new_name.startswith('.'): return jsonify({'error': 'Invalid name'}), 400 # Lookup in RoomFile rf = RoomFile.query.filter_by(room_id=room_id, name=old_name, path=rel_path).first() if not rf: return jsonify({'error': 'Original file/folder not found'}), 404 room_dir = get_room_dir(room_id) base_dir = os.path.join(room_dir, rel_path) if rel_path else room_dir old_path = os.path.join(base_dir, old_name) new_path = os.path.join(base_dir, new_name) if not os.path.exists(old_path): return jsonify({'error': 'Original file/folder not found'}), 404 if os.path.exists(new_path): return jsonify({'error': 'A file or folder with the new name already exists'}), 400 # Prevent file extension change for files if os.path.isfile(old_path): old_ext = os.path.splitext(old_name)[1].lower() new_ext = os.path.splitext(new_name)[1].lower() if old_ext != new_ext: return jsonify({'error': 'File extension cannot be changed'}), 400 os.rename(old_path, new_path) # Update RoomFile entry rf.name = new_name rf.modified = os.path.getmtime(new_path) # If this is a folder, update paths of all contained files and subfolders if os.path.isdir(new_path): old_folder_path = os.path.join(rel_path, old_name) if rel_path else old_name new_folder_path = os.path.join(rel_path, new_name) if rel_path else new_name # Get all files and folders that are under the old path contained_items = RoomFile.query.filter( RoomFile.room_id == room_id, RoomFile.path.like(f"{old_folder_path}%") ).all() # Update their paths for item in contained_items: # Replace the old folder path with the new one in the item's path item.path = item.path.replace(old_folder_path, new_folder_path, 1) db.session.commit() log_event( event_type='file_rename', details={ 'renamed_by': f"{current_user.username} {current_user.last_name}", 'old_name': old_name, 'new_name': new_name, 'room_id': room_id, 'path': rel_path, 'type': rf.type, 'size': rf.size if rf.type == 'file' else None, 'is_folder': os.path.isdir(new_path) }, user_id=current_user.id ) db.session.commit() return jsonify({'success': True, 'old_name': old_name, 'new_name': new_name}) @room_files_bp.route('//download-zip', methods=['POST']) @login_required def download_zip(room_id): """ Download multiple files as a zip archive. Args: room_id (int): ID of the room containing the files Returns: ZIP file download response """ room = Room.query.get_or_404(room_id) if not user_has_permission(room, 'can_view'): abort(403) data = request.get_json() items = data.get('items', []) if not items or not isinstance(items, list): return jsonify({'error': 'No items selected'}), 400 room_dir = get_room_dir(room_id) mem_zip = io.BytesIO() with zipfile.ZipFile(mem_zip, 'w', zipfile.ZIP_DEFLATED) as zf: for item in items: name = item.get('name') rel_path = item.get('path', '').strip('/\\') abs_path = os.path.join(room_dir, rel_path, name) if rel_path else os.path.join(room_dir, name) if not os.path.exists(abs_path): continue if os.path.isfile(abs_path): zf.write(abs_path, arcname=name) elif os.path.isdir(abs_path): for root, dirs, files in os.walk(abs_path): for file in files: file_path = os.path.join(root, file) arcname = os.path.relpath(file_path, room_dir) zf.write(file_path, arcname=arcname) mem_zip.seek(0) return send_file(mem_zip, mimetype='application/zip', as_attachment=True, download_name='download.zip') @room_files_bp.route('//search', methods=['GET']) @login_required def search_room_files(room_id): """ Search for files in a room by name, including files in subfolders. Args: room_id (int): ID of the room to search in Returns: JSON response containing matching files with their full paths """ room = Room.query.get_or_404(room_id) if not user_has_permission(room, 'can_view'): abort(403) query = request.args.get('query', '').strip().lower() # Search RoomFile for this room files = RoomFile.query.filter(RoomFile.room_id==room_id, RoomFile.deleted==False).all() matches = [] for f in files: # Create full path by combining folder path and filename full_path = f"{f.path}/{f.name}" if f.path else f.name if query in f.name.lower() or query in full_path.lower(): uploader_full_name = None uploader_profile_pic = None if f.uploader: uploader_full_name = f.uploader.username if getattr(f.uploader, 'last_name', None): uploader_full_name += ' ' + f.uploader.last_name uploader_profile_pic = f.uploader.profile_picture if getattr(f.uploader, 'profile_picture', None) else None matches.append({ 'name': f.name, 'type': f.type, 'size': f.size if f.type == 'file' else '-', 'modified': f.modified, 'uploaded_by': uploader_full_name, 'uploader_profile_pic': uploader_profile_pic, 'uploaded_at': f.uploaded_at.isoformat() if f.uploaded_at else None, 'path': f.path, 'full_path': full_path, # Add full path to the response 'starred': current_user in f.starred_by }) return jsonify(matches) @room_files_bp.route('//move', methods=['POST']) @login_required def move_room_file(room_id): """ Move a file or folder to a different location within a room. Args: room_id (int): ID of the room containing the file/folder Returns: JSON response indicating success or error """ room = Room.query.get_or_404(room_id) if not user_has_permission(room, 'can_move'): abort(403) data = request.get_json() filename = data.get('filename', '').strip() source_path = clean_path(data.get('source_path', '')) target_path = clean_path(data.get('target_path', '')) if not filename or '/' in filename or '\\' in filename or filename.startswith('.'): return jsonify({'error': 'Invalid filename'}), 400 # Lookup in RoomFile rf = RoomFile.query.filter_by(room_id=room_id, name=filename, path=source_path).first() if not rf: return jsonify({'error': 'File not found'}), 404 room_dir = get_room_dir(room_id) source_dir = os.path.join(room_dir, source_path) if source_path else room_dir target_dir = os.path.join(room_dir, target_path) if target_path else room_dir source_file_path = os.path.join(source_dir, filename) target_file_path = os.path.join(target_dir, filename) if not os.path.exists(source_file_path): return jsonify({'error': 'Source file not found'}), 404 if os.path.exists(target_file_path): return jsonify({'error': 'A file with this name already exists in the target location'}), 400 # Create target directory if it doesn't exist os.makedirs(target_dir, exist_ok=True) # Move the file shutil.move(source_file_path, target_file_path) # Update RoomFile entry rf.path = target_path rf.modified = os.path.getmtime(target_file_path) db.session.commit() log_event( event_type='file_move', details={ 'moved_by': f"{current_user.username} {current_user.last_name}", 'filename': filename, 'room_id': room_id, 'source_path': source_path, 'target_path': target_path, 'type': rf.type, 'size': rf.size if rf.type == 'file' else None }, user_id=current_user.id ) db.session.commit() return jsonify({'success': True}) @room_files_bp.route('//folders', methods=['GET']) @login_required def list_room_folders(room_id): """ List all folders in a room. Args: room_id (int): ID of the room to list folders from Returns: JSON response containing list of folder paths """ room = Room.query.get_or_404(room_id) if not user_has_permission(room, 'can_view'): abort(403) # Get all files in the room that are not deleted files = RoomFile.query.filter_by(room_id=room_id, deleted=False).all() # Extract unique folder paths folders = set() for f in files: if f.type == 'folder': full_path = f.path + '/' + f.name if f.path else f.name folders.add(full_path) # Convert to sorted list folder_list = sorted(list(folders)) return jsonify(folder_list) @room_files_bp.route('//star', methods=['POST']) @login_required def toggle_star(room_id): """ Toggle the starred status of a file. Args: room_id (int): ID of the room containing the file Returns: JSON response indicating success and new starred status """ room = Room.query.get_or_404(room_id) if not user_has_permission(room, 'can_view'): abort(403) data = request.get_json() filename = data.get('filename', '').strip() rel_path = clean_path(data.get('path', '')) if not filename: return jsonify({'error': 'Invalid filename'}), 400 # Lookup in RoomFile rf = RoomFile.query.filter_by(room_id=room_id, name=filename, path=rel_path).first() if not rf: return jsonify({'error': 'File not found'}), 404 # Check if the file is already starred by this user is_starred = current_user in rf.starred_by if is_starred: # Unstar the file rf.starred_by.remove(current_user) log_event( event_type='file_unstar', details={ 'unstarred_by': f"{current_user.username} {current_user.last_name}", 'filename': filename, 'room_id': room_id, 'path': rel_path, 'type': rf.type, 'size': rf.size if rf.type == 'file' else None }, user_id=current_user.id ) else: # Star the file rf.starred_by.append(current_user) log_event( event_type='file_star', details={ 'starred_by': f"{current_user.username} {current_user.last_name}", 'filename': filename, 'room_id': room_id, 'path': rel_path, 'type': rf.type, 'size': rf.size if rf.type == 'file' else None }, user_id=current_user.id ) db.session.commit() return jsonify({'success': True, 'starred': not is_starred}) @room_files_bp.route('//starred', methods=['GET']) @login_required def get_starred_files(room_id): """ Get all starred files in a room. Args: room_id (int): ID of the room to get starred files from Returns: JSON response containing list of starred files """ room = Room.query.get_or_404(room_id) if not user_has_permission(room, 'can_view'): abort(403) # Get all starred files in the room files = RoomFile.query.filter_by(room_id=room_id, starred=True).all() result = [] for f in files: uploader_full_name = None uploader_profile_pic = None if f.uploader: uploader_full_name = f.uploader.username if getattr(f.uploader, 'last_name', None): uploader_full_name += ' ' + f.uploader.last_name uploader_profile_pic = f.uploader.profile_picture if getattr(f.uploader, 'profile_picture', None) else None result.append({ 'name': f.name, 'type': f.type, 'size': f.size if f.type == 'file' else '-', 'modified': f.modified, 'uploaded_by': uploader_full_name, 'uploader_profile_pic': uploader_profile_pic, 'uploaded_at': f.uploaded_at.isoformat() if f.uploaded_at else None, 'path': f.path, 'starred': f.starred }) return jsonify(result) @room_files_bp.route('/starred', methods=['GET']) @login_required def get_all_starred_files(): """ Get all starred files across all accessible rooms. Returns: JSON response containing list of all starred files """ # Get all rooms the user has access to if current_user.is_admin: rooms = Room.query.all() else: rooms = Room.query.filter(Room.members.any(id=current_user.id)).all() room_ids = [room.id for room in rooms] room_names = {room.id: room.name for room in rooms} # Get all files starred by the current user from accessible rooms files = RoomFile.query.filter( RoomFile.room_id.in_(room_ids), RoomFile.starred_by.contains(current_user) ).all() result = [] for f in files: uploader_full_name = None uploader_profile_pic = None if f.uploader: uploader_full_name = f.uploader.username if getattr(f.uploader, 'last_name', None): uploader_full_name += ' ' + f.uploader.last_name uploader_profile_pic = f.uploader.profile_picture if getattr(f.uploader, 'profile_picture', None) else None result.append({ 'name': f.name, 'type': f.type, 'size': f.size if f.type == 'file' else '-', 'modified': f.modified, 'uploaded_by': uploader_full_name, 'uploader_profile_pic': uploader_profile_pic, 'uploaded_at': f.uploaded_at.isoformat() if f.uploaded_at else None, 'path': f.path, 'starred': True, 'room_id': f.room_id, 'room_name': room_names.get(f.room_id) }) return jsonify(result) @room_files_bp.route('/trash', methods=['GET']) @login_required def get_trash_files(): """ Get all deleted files from accessible rooms. Returns: JSON response containing list of deleted files """ # Get all rooms the user has access to if current_user.is_admin: rooms = Room.query.all() else: rooms = Room.query.filter(Room.members.any(id=current_user.id)).all() room_ids = [room.id for room in rooms] room_names = {room.id: room.name for room in rooms} # Get all deleted files from accessible rooms files = RoomFile.query.filter( RoomFile.room_id.in_(room_ids), RoomFile.deleted == True ).all() result = [] for f in files: uploader_full_name = None uploader_profile_pic = None if f.uploader: uploader_full_name = f.uploader.username if getattr(f.uploader, 'last_name', None): uploader_full_name += ' ' + f.uploader.last_name uploader_profile_pic = f.uploader.profile_picture if getattr(f.uploader, 'profile_picture', None) else None deleter_full_name = None if f.deleter: deleter_full_name = f.deleter.username if getattr(f.deleter, 'last_name', None): deleter_full_name += ' ' + f.deleter.last_name # Check if user has delete permission in this room room = Room.query.get(f.room_id) has_delete_permission = user_has_permission(room, 'can_delete') if room else False # Check if user can restore this file can_restore = has_delete_permission and f.deleted_by == current_user.id result.append({ 'name': f.name, 'type': f.type, 'size': f.size if f.type == 'file' else '-', 'modified': f.modified, 'uploaded_by': uploader_full_name, 'uploader_profile_pic': uploader_profile_pic, 'uploaded_at': f.uploaded_at.isoformat() if f.uploaded_at else None, 'path': f.path, 'room_id': f.room_id, 'room_name': room_names.get(f.room_id), 'deleted_by': deleter_full_name, 'deleted_at': f.deleted_at.isoformat() if f.deleted_at else None, 'can_restore': can_restore }) return jsonify(result) @room_files_bp.route('//restore', methods=['POST']) @login_required def restore_file(room_id): """ Restore a deleted file from trash. Args: room_id (int): ID of the room containing the deleted file Returns: JSON response indicating success or error """ room = Room.query.get_or_404(room_id) # Check for delete permission instead of view permission if not user_has_permission(room, 'can_delete'): abort(403) data = request.get_json() filename = data.get('filename', '').strip() rel_path = clean_path(data.get('path', '')) if not filename: return jsonify({'error': 'Invalid filename'}), 400 # Lookup in RoomFile rf = RoomFile.query.filter_by(room_id=room_id, name=filename, path=rel_path).first() if not rf: return jsonify({'error': 'File not found'}), 404 # Check if the current user was the one who deleted the file if rf.deleted_by != current_user.id: return jsonify({'error': 'You can only restore files that you deleted'}), 403 # Restore file by setting deleted to False rf.deleted = False db.session.commit() log_event( event_type='file_restore', details={ 'restored_by': f"{current_user.username} {current_user.last_name}", 'filename': filename, 'room_id': room_id, 'path': rel_path, 'type': rf.type, 'size': rf.size if rf.type == 'file' else None }, user_id=current_user.id ) db.session.commit() return jsonify({'success': True}) @room_files_bp.route('//delete-permanent', methods=['POST']) @login_required def delete_permanent(room_id): """ Permanently delete files from trash (admin only). Args: room_id (int): ID of the room containing the files to delete Returns: JSON response indicating success or error """ # Only allow admin users to permanently delete files if not current_user.is_admin: abort(403) room = Room.query.get_or_404(room_id) data = request.get_json() filename = data.get('filename', '').strip() rel_path = clean_path(data.get('path', '')) if not filename: return jsonify({'error': 'Invalid filename'}), 400 # If filename is '*', delete all deleted files in the room if filename == '*': files_to_delete = RoomFile.query.filter_by(room_id=room_id, deleted=True).all() else: # Lookup specific file files_to_delete = [RoomFile.query.filter_by(room_id=room_id, name=filename, path=rel_path).first()] if not files_to_delete[0]: return jsonify({'error': 'File not found'}), 404 for rf in files_to_delete: if not rf: continue # Delete the file/folder from storage try: file_path = os.path.join(get_room_dir(room_id), rf.path, rf.name) if os.path.exists(file_path): if rf.type == 'file': os.remove(file_path) elif rf.type == 'folder': # For folders, we need to delete all contents recursively shutil.rmtree(file_path) # Also delete all database records for files and subfolders folder_path = os.path.join(rf.path, rf.name) if rf.path else rf.name contained_items = RoomFile.query.filter( RoomFile.room_id == room_id, RoomFile.path.like(f"{folder_path}%") ).all() for item in contained_items: db.session.delete(item) # Delete the database record db.session.delete(rf) log_event( event_type='file_delete_permanent', details={ 'deleted_by': f"{current_user.username} {current_user.last_name}", 'filename': rf.name, 'room_id': room_id, 'path': rf.path, 'type': rf.type, 'size': rf.size if rf.type == 'file' else None }, user_id=current_user.id ) except Exception as e: print(f"Error deleting file {rf.name}: {str(e)}") continue # Commit all changes try: db.session.commit() except Exception as e: print(f"Error committing changes: {str(e)}") db.session.rollback() return jsonify({'error': 'Failed to delete files'}), 500 return jsonify({'success': True})