Files
docupulse/routes/room_files.py
2025-06-06 10:12:46 +02:00

1095 lines
38 KiB
Python

"""
Room Files Management Module
This module provides a Flask Blueprint for managing files within rooms in the DocuPulse application.
It handles file operations such as uploading, downloading, moving, deleting, and organizing files
within room contexts. The module implements permission-based access control and supports various
file operations including file organization, starring, and trash management.
Key Features:
- File upload and download with permission checks
- Folder creation and management
- File operations (move, rename, delete)
- File starring system
- Trash management with restore capability
- Bulk operations (zip download)
- File search functionality
The module uses a combination of database records (RoomFile model) and actual file system storage
to maintain file metadata and content.
"""
from flask import Blueprint, jsonify, request, abort, send_from_directory, send_file
from flask_login import login_required, current_user
import os
from models import Room, RoomMemberPermission, RoomFile, TrashedFile, db, User, Notif
from werkzeug.utils import secure_filename, safe_join
import time
import shutil
import io
import zipfile
from datetime import datetime
from utils import log_event, create_notification, get_unread_count
# Blueprint for room file operations
room_files_bp = Blueprint('room_files', __name__, url_prefix='/api/rooms')
# Root directory for storing room files
DATA_ROOT = '/app/uploads/rooms' # Updated to match Docker volume mount point
# Set of allowed file extensions for upload
ALLOWED_EXTENSIONS = {
# Documents
'pdf', 'docx', 'doc', 'txt', 'rtf', 'odt', 'md', 'csv',
# Spreadsheets
'xlsx', 'xls', 'ods', 'xlsm',
# Presentations
'pptx', 'ppt', 'odp',
# Images
'jpg', 'jpeg', 'png', 'gif', 'bmp', 'svg', 'webp', 'tiff',
# Archives
'zip', 'rar', '7z', 'tar', 'gz',
# Code/Text
'py', 'js', 'html', 'css', 'json', 'xml', 'sql', 'sh', 'bat',
# Audio
'mp3', 'wav', 'ogg', 'm4a', 'flac',
# Video
'mp4', 'avi', 'mov', 'wmv', 'flv', 'mkv', 'webm',
# CAD/Design
'dwg', 'dxf', 'ai', 'psd', 'eps', 'indd',
# Other
'eml', 'msg', 'vcf', 'ics'
}
@room_files_bp.context_processor
def inject_unread_notifications():
if current_user.is_authenticated:
unread_count = get_unread_count(current_user.id)
return {'unread_notifications': unread_count}
return {'unread_notifications': 0}
def get_room_dir(room_id):
"""
Get the absolute path to a room's directory.
Args:
room_id (int): The ID of the room
Returns:
str: Absolute path to the room's directory
"""
return os.path.join(DATA_ROOT, str(room_id))
def user_has_permission(room, perm_name):
"""
Check if the current user has a specific permission in a room.
Args:
room (Room): The room object to check permissions for
perm_name (str): Name of the permission to check
Returns:
bool: True if user has permission, False otherwise
"""
if current_user.is_admin:
return True
perm = RoomMemberPermission.query.filter_by(room_id=room.id, user_id=current_user.id).first()
return getattr(perm, perm_name, False) if perm else False
def allowed_file(filename):
"""
Check if a file's extension is in the allowed extensions list.
Args:
filename (str): Name of the file to check
Returns:
bool: True if file extension is allowed, False otherwise
"""
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def clean_path(path):
"""
Clean a path string by removing leading/trailing slashes.
Args:
path (str): Path string to clean
Returns:
str: Cleaned path string
"""
if not path:
return ''
return path.strip('/\\')
@room_files_bp.route('/<int:room_id>/files', methods=['GET'])
@login_required
def list_room_files(room_id):
"""
List all files in a room's directory.
Args:
room_id (int): ID of the room to list files from
Returns:
JSON response containing list of files with their metadata
"""
room = Room.query.get_or_404(room_id)
if not user_has_permission(room, 'can_view'):
abort(403)
path = request.args.get('path', '')
path = clean_path(path)
# Get files in the current path
files = RoomFile.query.filter_by(room_id=room_id, path=path, deleted=False).all()
# Debug: Check user permissions
if not current_user.is_admin:
perm = RoomMemberPermission.query.filter_by(room_id=room.id, user_id=current_user.id).first()
print("=== User Permissions ===")
print(f" - can_view: {getattr(perm, 'can_view', False) if perm else False}")
print(f" - can_download: {getattr(perm, 'can_download', False) if perm else False}")
print(f" - can_upload: {getattr(perm, 'can_upload', False) if perm else False}")
print(f" - can_delete: {getattr(perm, 'can_delete', False) if perm else False}")
print(f" - can_rename: {getattr(perm, 'can_rename', False) if perm else False}")
print(f" - can_move: {getattr(perm, 'can_move', False) if perm else False}")
print(f" - can_share: {getattr(perm, 'can_share', False) if perm else False}")
print("-------------------")
result = []
for f in files:
uploader_full_name = None
uploader_profile_pic = None
if f.uploader:
uploader_full_name = f.uploader.username
if getattr(f.uploader, 'last_name', None):
uploader_full_name += ' ' + f.uploader.last_name
uploader_profile_pic = f.uploader.profile_picture if getattr(f.uploader, 'profile_picture', None) else None
result.append({
'name': f.name,
'type': f.type,
'size': f.size if f.type == 'file' else '-',
'modified': f.modified,
'uploaded_by': uploader_full_name,
'uploader_profile_pic': uploader_profile_pic,
'uploaded_at': f.uploaded_at.isoformat() if f.uploaded_at else None,
'path': f.path,
'starred': current_user in f.starred_by
})
print(f"Returning {len(result)} files") # Debug log
return jsonify(result)
@room_files_bp.route('/<int:room_id>/files/upload', methods=['POST'])
@login_required
def upload_room_file(room_id):
room = Room.query.get_or_404(room_id)
if not user_has_permission(room, 'can_upload'):
abort(403)
if 'file' not in request.files:
return jsonify({'error': 'No file part'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No selected file'}), 400
if not allowed_file(file.filename):
return jsonify({'error': 'File type not allowed'}), 400
filename = secure_filename(file.filename)
room_dir = get_room_dir(room_id)
rel_path = clean_path(request.form.get('path', ''))
target_dir = os.path.join(room_dir, rel_path) if rel_path else room_dir
os.makedirs(target_dir, exist_ok=True)
file_path = os.path.join(target_dir, filename)
# Check for overwrite flag
overwrite = request.form.get('overwrite', 'false').lower() == 'true'
# First check for non-deleted files
existing_file = RoomFile.query.filter_by(room_id=room_id, name=filename, path=rel_path, deleted=False).first()
if existing_file and not overwrite:
return jsonify({'error': 'A file with this name already exists in this location', 'conflict': True}), 409
# Then check for deleted files
trashed_file = RoomFile.query.filter_by(room_id=room_id, name=filename, path=rel_path, deleted=True).first()
if trashed_file:
# If we're not overwriting, return conflict
if not overwrite:
return jsonify({'error': 'A file with this name exists in the trash', 'conflict': True}), 409
# If we are overwriting, delete the trashed file record
db.session.delete(trashed_file)
existing_file = None
file.save(file_path)
stat = os.stat(file_path)
if existing_file:
# Overwrite: update the RoomFile record
existing_file.size = stat.st_size
existing_file.modified = stat.st_mtime
existing_file.uploaded_by = current_user.id
existing_file.uploaded_at = datetime.utcnow()
db.session.commit()
log_event(
event_type='file_upload',
details={
'uploaded_by': f"{current_user.username} {current_user.last_name}",
'filename': filename,
'room_id': room_id,
'path': rel_path,
'size': stat.st_size,
'overwritten': True
},
user_id=current_user.id
)
db.session.commit()
return jsonify({'success': True, 'filename': filename, 'overwritten': True})
else:
rf = RoomFile(
room_id=room_id,
name=filename,
path=rel_path,
type='file',
size=stat.st_size,
modified=stat.st_mtime,
uploaded_by=current_user.id,
uploaded_at=datetime.utcnow()
)
db.session.add(rf)
db.session.commit()
log_event(
event_type='file_upload',
details={
'uploaded_by': f"{current_user.username} {current_user.last_name}",
'filename': filename,
'room_id': room_id,
'path': rel_path,
'size': stat.st_size,
'overwritten': False
},
user_id=current_user.id
)
db.session.commit()
return jsonify({'success': True, 'filename': filename})
@room_files_bp.route('/<int:room_id>/files/<filename>', methods=['GET'])
@login_required
def download_room_file(room_id, filename):
"""
Download or preview a file from a room.
Args:
room_id (int): ID of the room containing the file
filename (str): Name of the file to download/preview
Returns:
File download/preview response or error message
"""
room = Room.query.get_or_404(room_id)
if not user_has_permission(room, 'can_download'):
abort(403)
rel_path = clean_path(request.args.get('path', ''))
preview_mode = request.args.get('preview', 'false').lower() == 'true'
# Lookup in RoomFile
rf = RoomFile.query.filter_by(room_id=room_id, name=filename, path=rel_path).first()
if not rf or rf.type != 'file':
return jsonify({'error': 'File not found'}), 404
room_dir = get_room_dir(room_id)
file_path = os.path.join(room_dir, rel_path, filename) if rel_path else os.path.join(room_dir, filename)
if not os.path.exists(file_path):
return jsonify({'error': 'File not found'}), 404
# Log the event
log_event(
event_type='file_download' if not preview_mode else 'file_preview',
details={
'user': f"{current_user.username} {current_user.last_name}",
'filename': filename,
'room_id': room_id,
'path': rel_path,
'size': rf.size if rf else None,
'preview': preview_mode
},
user_id=current_user.id
)
db.session.commit()
# For preview mode, we don't set as_attachment
return send_from_directory(
os.path.dirname(file_path),
filename,
as_attachment=not preview_mode
)
@room_files_bp.route('/<int:room_id>/files/<path:filename>', methods=['DELETE'])
@login_required
def delete_file(room_id, filename):
"""
Delete a file from a room (moves to trash).
Args:
room_id (int): ID of the room containing the file
filename (str): Name of the file to delete
Returns:
JSON response indicating success or error
"""
room = Room.query.get_or_404(room_id)
if not user_has_permission(room, 'can_delete'):
abort(403)
rel_path = clean_path(request.args.get('path', ''))
# Lookup in RoomFile
rf = RoomFile.query.filter_by(room_id=room_id, name=filename, path=rel_path).first()
if not rf:
return jsonify({'error': 'File not found'}), 404
# If it's a folder, mark all contained items as deleted
if rf.type == 'folder':
folder_path = os.path.join(rf.path, rf.name) if rf.path else rf.name
contained_items = RoomFile.query.filter(
RoomFile.room_id == room_id,
RoomFile.path.like(f"{folder_path}%")
).all()
for item in contained_items:
item.deleted = True
item.deleted_by = current_user.id
item.deleted_at = datetime.utcnow()
# Mark as deleted and record who deleted it and when
rf.deleted = True
rf.deleted_by = current_user.id
rf.deleted_at = datetime.utcnow()
db.session.commit()
log_event(
event_type='file_delete',
details={
'deleted_by': f"{current_user.username} {current_user.last_name}",
'filename': filename,
'room_id': room_id,
'path': rel_path,
'type': rf.type,
'size': rf.size if rf.type == 'file' else None
},
user_id=current_user.id
)
db.session.commit()
return jsonify({'success': True})
@room_files_bp.route('/<int:room_id>/folders', methods=['POST'])
@login_required
def create_room_folder(room_id):
"""
Create a new folder in a room.
Args:
room_id (int): ID of the room to create folder in
Returns:
JSON response indicating success or error
"""
room = Room.query.get_or_404(room_id)
if not user_has_permission(room, 'can_upload'):
abort(403)
data = request.get_json()
folder_name = data.get('name', '').strip()
rel_path = clean_path(data.get('path', ''))
if not folder_name or '/' in folder_name or '\\' in folder_name or folder_name.startswith('.'):
return jsonify({'error': 'Invalid folder name'}), 400
room_dir = get_room_dir(room_id)
target_dir = os.path.join(room_dir, rel_path) if rel_path else room_dir
os.makedirs(target_dir, exist_ok=True)
folder_path = os.path.join(target_dir, folder_name)
# First check for trashed folder
trashed_folder = RoomFile.query.filter_by(
room_id=room_id,
name=folder_name,
path=rel_path,
type='folder',
deleted=True
).first()
if trashed_folder:
return jsonify({'error': 'A folder with this name exists in the trash'}), 409
# Then check for existing folder in current location
existing_folder = RoomFile.query.filter_by(
room_id=room_id,
name=folder_name,
path=rel_path,
type='folder',
deleted=False
).first()
if existing_folder:
return jsonify({'error': 'A folder with this name already exists in this location'}), 400
if os.path.exists(folder_path):
return jsonify({'error': 'A folder with this name already exists in this location'}), 400
os.makedirs(folder_path)
# Add RoomFile entry
stat = os.stat(folder_path)
rf = RoomFile(
room_id=room_id,
name=folder_name,
path=rel_path,
type='folder',
size=None,
modified=stat.st_mtime,
uploaded_by=current_user.id,
uploaded_at=datetime.utcnow()
)
db.session.add(rf)
db.session.commit()
log_event(
event_type='folder_create',
details={
'created_by': f"{current_user.username} {current_user.last_name}",
'folder_name': folder_name,
'room_id': room_id,
'path': rel_path
},
user_id=current_user.id
)
db.session.commit()
return jsonify({'success': True, 'name': folder_name})
@room_files_bp.route('/<int:room_id>/rename', methods=['POST'])
@login_required
def rename_room_file(room_id):
"""
Rename a file or folder in a room.
Args:
room_id (int): ID of the room containing the file/folder
Returns:
JSON response indicating success or error
"""
room = Room.query.get_or_404(room_id)
# Allow rename if user can upload or delete
if not (user_has_permission(room, 'can_upload') or user_has_permission(room, 'can_delete')):
abort(403)
data = request.get_json()
old_name = data.get('old_name', '').strip()
new_name = data.get('new_name', '').strip()
rel_path = clean_path(data.get('path', ''))
if not old_name or not new_name or '/' in new_name or '\\' in new_name or new_name.startswith('.'):
return jsonify({'error': 'Invalid name'}), 400
# Lookup in RoomFile
rf = RoomFile.query.filter_by(room_id=room_id, name=old_name, path=rel_path).first()
if not rf:
return jsonify({'error': 'Original file/folder not found'}), 404
room_dir = get_room_dir(room_id)
base_dir = os.path.join(room_dir, rel_path) if rel_path else room_dir
old_path = os.path.join(base_dir, old_name)
new_path = os.path.join(base_dir, new_name)
if not os.path.exists(old_path):
return jsonify({'error': 'Original file/folder not found'}), 404
if os.path.exists(new_path):
return jsonify({'error': 'A file or folder with the new name already exists'}), 400
# Prevent file extension change for files
if os.path.isfile(old_path):
old_ext = os.path.splitext(old_name)[1].lower()
new_ext = os.path.splitext(new_name)[1].lower()
if old_ext != new_ext:
return jsonify({'error': 'File extension cannot be changed'}), 400
os.rename(old_path, new_path)
# Update RoomFile entry
rf.name = new_name
rf.modified = os.path.getmtime(new_path)
# If this is a folder, update paths of all contained files and subfolders
if os.path.isdir(new_path):
old_folder_path = os.path.join(rel_path, old_name) if rel_path else old_name
new_folder_path = os.path.join(rel_path, new_name) if rel_path else new_name
# Get all files and folders that are under the old path
contained_items = RoomFile.query.filter(
RoomFile.room_id == room_id,
RoomFile.path.like(f"{old_folder_path}%")
).all()
# Update their paths
for item in contained_items:
# Replace the old folder path with the new one in the item's path
item.path = item.path.replace(old_folder_path, new_folder_path, 1)
db.session.commit()
log_event(
event_type='file_rename',
details={
'renamed_by': f"{current_user.username} {current_user.last_name}",
'old_name': old_name,
'new_name': new_name,
'room_id': room_id,
'path': rel_path,
'type': rf.type,
'size': rf.size if rf.type == 'file' else None,
'is_folder': os.path.isdir(new_path)
},
user_id=current_user.id
)
db.session.commit()
return jsonify({'success': True, 'old_name': old_name, 'new_name': new_name})
@room_files_bp.route('/<int:room_id>/download-zip', methods=['POST'])
@login_required
def download_zip(room_id):
"""
Download multiple files as a zip archive.
Args:
room_id (int): ID of the room containing the files
Returns:
ZIP file download response
"""
room = Room.query.get_or_404(room_id)
if not user_has_permission(room, 'can_view'):
abort(403)
data = request.get_json()
items = data.get('items', [])
if not items or not isinstance(items, list):
return jsonify({'error': 'No items selected'}), 400
room_dir = get_room_dir(room_id)
mem_zip = io.BytesIO()
with zipfile.ZipFile(mem_zip, 'w', zipfile.ZIP_DEFLATED) as zf:
for item in items:
name = item.get('name')
rel_path = item.get('path', '').strip('/\\')
abs_path = os.path.join(room_dir, rel_path, name) if rel_path else os.path.join(room_dir, name)
if not os.path.exists(abs_path):
continue
if os.path.isfile(abs_path):
zf.write(abs_path, arcname=name)
elif os.path.isdir(abs_path):
for root, dirs, files in os.walk(abs_path):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, room_dir)
zf.write(file_path, arcname=arcname)
mem_zip.seek(0)
return send_file(mem_zip, mimetype='application/zip', as_attachment=True, download_name='download.zip')
@room_files_bp.route('/<int:room_id>/search', methods=['GET'])
@login_required
def search_room_files(room_id):
"""
Search for files in a room by name, including files in subfolders.
Args:
room_id (int): ID of the room to search in
Returns:
JSON response containing matching files with their full paths
"""
room = Room.query.get_or_404(room_id)
if not user_has_permission(room, 'can_view'):
abort(403)
query = request.args.get('query', '').strip().lower()
# Search RoomFile for this room
files = RoomFile.query.filter(RoomFile.room_id==room_id, RoomFile.deleted==False).all()
matches = []
for f in files:
# Create full path by combining folder path and filename
full_path = f"{f.path}/{f.name}" if f.path else f.name
if query in f.name.lower() or query in full_path.lower():
uploader_full_name = None
uploader_profile_pic = None
if f.uploader:
uploader_full_name = f.uploader.username
if getattr(f.uploader, 'last_name', None):
uploader_full_name += ' ' + f.uploader.last_name
uploader_profile_pic = f.uploader.profile_picture if getattr(f.uploader, 'profile_picture', None) else None
matches.append({
'name': f.name,
'type': f.type,
'size': f.size if f.type == 'file' else '-',
'modified': f.modified,
'uploaded_by': uploader_full_name,
'uploader_profile_pic': uploader_profile_pic,
'uploaded_at': f.uploaded_at.isoformat() if f.uploaded_at else None,
'path': f.path,
'full_path': full_path, # Add full path to the response
'starred': current_user in f.starred_by
})
return jsonify(matches)
@room_files_bp.route('/<int:room_id>/move', methods=['POST'])
@login_required
def move_room_file(room_id):
"""
Move a file or folder to a different location within a room.
Args:
room_id (int): ID of the room containing the file/folder
Returns:
JSON response indicating success or error
"""
room = Room.query.get_or_404(room_id)
if not user_has_permission(room, 'can_move'):
abort(403)
data = request.get_json()
filename = data.get('filename', '').strip()
source_path = clean_path(data.get('source_path', ''))
target_path = clean_path(data.get('target_path', ''))
if not filename or '/' in filename or '\\' in filename or filename.startswith('.'):
return jsonify({'error': 'Invalid filename'}), 400
# Lookup in RoomFile
rf = RoomFile.query.filter_by(room_id=room_id, name=filename, path=source_path).first()
if not rf:
return jsonify({'error': 'File not found'}), 404
room_dir = get_room_dir(room_id)
source_dir = os.path.join(room_dir, source_path) if source_path else room_dir
target_dir = os.path.join(room_dir, target_path) if target_path else room_dir
source_file_path = os.path.join(source_dir, filename)
target_file_path = os.path.join(target_dir, filename)
if not os.path.exists(source_file_path):
return jsonify({'error': 'Source file not found'}), 404
if os.path.exists(target_file_path):
return jsonify({'error': 'A file with this name already exists in the target location'}), 400
# Create target directory if it doesn't exist
os.makedirs(target_dir, exist_ok=True)
# Move the file
shutil.move(source_file_path, target_file_path)
# Update RoomFile entry
rf.path = target_path
rf.modified = os.path.getmtime(target_file_path)
db.session.commit()
log_event(
event_type='file_move',
details={
'moved_by': f"{current_user.username} {current_user.last_name}",
'filename': filename,
'room_id': room_id,
'source_path': source_path,
'target_path': target_path,
'type': rf.type,
'size': rf.size if rf.type == 'file' else None
},
user_id=current_user.id
)
db.session.commit()
return jsonify({'success': True})
@room_files_bp.route('/<int:room_id>/folders', methods=['GET'])
@login_required
def list_room_folders(room_id):
"""
List all folders in a room.
Args:
room_id (int): ID of the room to list folders from
Returns:
JSON response containing list of folder paths
"""
room = Room.query.get_or_404(room_id)
if not user_has_permission(room, 'can_view'):
abort(403)
# Get all files in the room that are not deleted
files = RoomFile.query.filter_by(room_id=room_id, deleted=False).all()
# Extract unique folder paths
folders = set()
for f in files:
if f.type == 'folder':
full_path = f.path + '/' + f.name if f.path else f.name
folders.add(full_path)
# Convert to sorted list
folder_list = sorted(list(folders))
return jsonify(folder_list)
@room_files_bp.route('/<int:room_id>/star', methods=['POST'])
@login_required
def toggle_star(room_id):
"""
Toggle the starred status of a file.
Args:
room_id (int): ID of the room containing the file
Returns:
JSON response indicating success and new starred status
"""
room = Room.query.get_or_404(room_id)
if not user_has_permission(room, 'can_view'):
abort(403)
data = request.get_json()
filename = data.get('filename', '').strip()
rel_path = clean_path(data.get('path', ''))
if not filename:
return jsonify({'error': 'Invalid filename'}), 400
# Lookup in RoomFile
rf = RoomFile.query.filter_by(room_id=room_id, name=filename, path=rel_path).first()
if not rf:
return jsonify({'error': 'File not found'}), 404
# Check if the file is already starred by this user
is_starred = current_user in rf.starred_by
if is_starred:
# Unstar the file
rf.starred_by.remove(current_user)
log_event(
event_type='file_unstar',
details={
'unstarred_by': f"{current_user.username} {current_user.last_name}",
'filename': filename,
'room_id': room_id,
'path': rel_path,
'type': rf.type,
'size': rf.size if rf.type == 'file' else None
},
user_id=current_user.id
)
else:
# Star the file
rf.starred_by.append(current_user)
log_event(
event_type='file_star',
details={
'starred_by': f"{current_user.username} {current_user.last_name}",
'filename': filename,
'room_id': room_id,
'path': rel_path,
'type': rf.type,
'size': rf.size if rf.type == 'file' else None
},
user_id=current_user.id
)
db.session.commit()
return jsonify({'success': True, 'starred': not is_starred})
@room_files_bp.route('/<int:room_id>/starred', methods=['GET'])
@login_required
def get_starred_files(room_id):
"""
Get all starred files in a room.
Args:
room_id (int): ID of the room to get starred files from
Returns:
JSON response containing list of starred files
"""
room = Room.query.get_or_404(room_id)
if not user_has_permission(room, 'can_view'):
abort(403)
# Get all starred files in the room
files = RoomFile.query.filter_by(room_id=room_id, starred=True).all()
result = []
for f in files:
uploader_full_name = None
uploader_profile_pic = None
if f.uploader:
uploader_full_name = f.uploader.username
if getattr(f.uploader, 'last_name', None):
uploader_full_name += ' ' + f.uploader.last_name
uploader_profile_pic = f.uploader.profile_picture if getattr(f.uploader, 'profile_picture', None) else None
result.append({
'name': f.name,
'type': f.type,
'size': f.size if f.type == 'file' else '-',
'modified': f.modified,
'uploaded_by': uploader_full_name,
'uploader_profile_pic': uploader_profile_pic,
'uploaded_at': f.uploaded_at.isoformat() if f.uploaded_at else None,
'path': f.path,
'starred': f.starred
})
return jsonify(result)
@room_files_bp.route('/starred', methods=['GET'])
@login_required
def get_all_starred_files():
"""
Get all starred files across all accessible rooms.
Returns:
JSON response containing list of all starred files
"""
# Get all rooms the user has access to
if current_user.is_admin:
rooms = Room.query.all()
else:
rooms = Room.query.filter(Room.members.any(id=current_user.id)).all()
room_ids = [room.id for room in rooms]
room_names = {room.id: room.name for room in rooms}
# Get all files starred by the current user from accessible rooms
files = RoomFile.query.filter(
RoomFile.room_id.in_(room_ids),
RoomFile.starred_by.contains(current_user)
).all()
result = []
for f in files:
uploader_full_name = None
uploader_profile_pic = None
if f.uploader:
uploader_full_name = f.uploader.username
if getattr(f.uploader, 'last_name', None):
uploader_full_name += ' ' + f.uploader.last_name
uploader_profile_pic = f.uploader.profile_picture if getattr(f.uploader, 'profile_picture', None) else None
result.append({
'name': f.name,
'type': f.type,
'size': f.size if f.type == 'file' else '-',
'modified': f.modified,
'uploaded_by': uploader_full_name,
'uploader_profile_pic': uploader_profile_pic,
'uploaded_at': f.uploaded_at.isoformat() if f.uploaded_at else None,
'path': f.path,
'starred': True,
'room_id': f.room_id,
'room_name': room_names.get(f.room_id)
})
return jsonify(result)
@room_files_bp.route('/trash', methods=['GET'])
@login_required
def get_trash_files():
"""
Get all deleted files from accessible rooms.
Returns:
JSON response containing list of deleted files
"""
# Get all rooms the user has access to
if current_user.is_admin:
rooms = Room.query.all()
else:
rooms = Room.query.filter(Room.members.any(id=current_user.id)).all()
room_ids = [room.id for room in rooms]
room_names = {room.id: room.name for room in rooms}
# Get all deleted files from accessible rooms
files = RoomFile.query.filter(
RoomFile.room_id.in_(room_ids),
RoomFile.deleted == True
).all()
result = []
for f in files:
uploader_full_name = None
uploader_profile_pic = None
if f.uploader:
uploader_full_name = f.uploader.username
if getattr(f.uploader, 'last_name', None):
uploader_full_name += ' ' + f.uploader.last_name
uploader_profile_pic = f.uploader.profile_picture if getattr(f.uploader, 'profile_picture', None) else None
deleter_full_name = None
if f.deleter:
deleter_full_name = f.deleter.username
if getattr(f.deleter, 'last_name', None):
deleter_full_name += ' ' + f.deleter.last_name
# Check if user has delete permission in this room
room = Room.query.get(f.room_id)
has_delete_permission = user_has_permission(room, 'can_delete') if room else False
# Check if user can restore this file
can_restore = has_delete_permission and f.deleted_by == current_user.id
result.append({
'name': f.name,
'type': f.type,
'size': f.size if f.type == 'file' else '-',
'modified': f.modified,
'uploaded_by': uploader_full_name,
'uploader_profile_pic': uploader_profile_pic,
'uploaded_at': f.uploaded_at.isoformat() if f.uploaded_at else None,
'path': f.path,
'room_id': f.room_id,
'room_name': room_names.get(f.room_id),
'deleted_by': deleter_full_name,
'deleted_at': f.deleted_at.isoformat() if f.deleted_at else None,
'can_restore': can_restore
})
return jsonify(result)
@room_files_bp.route('/<int:room_id>/restore', methods=['POST'])
@login_required
def restore_file(room_id):
"""
Restore a deleted file from trash.
Args:
room_id (int): ID of the room containing the deleted file
Returns:
JSON response indicating success or error
"""
room = Room.query.get_or_404(room_id)
# Check for delete permission instead of view permission
if not user_has_permission(room, 'can_delete'):
abort(403)
data = request.get_json()
filename = data.get('filename', '').strip()
rel_path = clean_path(data.get('path', ''))
if not filename:
return jsonify({'error': 'Invalid filename'}), 400
# Lookup in RoomFile
rf = RoomFile.query.filter_by(room_id=room_id, name=filename, path=rel_path).first()
if not rf:
return jsonify({'error': 'File not found'}), 404
# Check if the current user was the one who deleted the file
if rf.deleted_by != current_user.id:
return jsonify({'error': 'You can only restore files that you deleted'}), 403
# Restore file by setting deleted to False
rf.deleted = False
db.session.commit()
log_event(
event_type='file_restore',
details={
'restored_by': f"{current_user.username} {current_user.last_name}",
'filename': filename,
'room_id': room_id,
'path': rel_path,
'type': rf.type,
'size': rf.size if rf.type == 'file' else None
},
user_id=current_user.id
)
db.session.commit()
return jsonify({'success': True})
@room_files_bp.route('/<int:room_id>/delete-permanent', methods=['POST'])
@login_required
def delete_permanent(room_id):
"""
Permanently delete files from trash (admin only).
Args:
room_id (int): ID of the room containing the files to delete
Returns:
JSON response indicating success or error
"""
# Only allow admin users to permanently delete files
if not current_user.is_admin:
abort(403)
room = Room.query.get_or_404(room_id)
data = request.get_json()
filename = data.get('filename', '').strip()
rel_path = clean_path(data.get('path', ''))
if not filename:
return jsonify({'error': 'Invalid filename'}), 400
# If filename is '*', delete all deleted files in the room
if filename == '*':
files_to_delete = RoomFile.query.filter_by(room_id=room_id, deleted=True).all()
else:
# Lookup specific file
files_to_delete = [RoomFile.query.filter_by(room_id=room_id, name=filename, path=rel_path).first()]
if not files_to_delete[0]:
return jsonify({'error': 'File not found'}), 404
for rf in files_to_delete:
if not rf:
continue
# Delete the file/folder from storage
try:
file_path = os.path.join(get_room_dir(room_id), rf.path, rf.name)
if os.path.exists(file_path):
if rf.type == 'file':
os.remove(file_path)
elif rf.type == 'folder':
# For folders, we need to delete all contents recursively
shutil.rmtree(file_path)
# Also delete all database records for files and subfolders
folder_path = os.path.join(rf.path, rf.name) if rf.path else rf.name
contained_items = RoomFile.query.filter(
RoomFile.room_id == room_id,
RoomFile.path.like(f"{folder_path}%")
).all()
for item in contained_items:
db.session.delete(item)
# Delete the database record
db.session.delete(rf)
log_event(
event_type='file_delete_permanent',
details={
'deleted_by': f"{current_user.username} {current_user.last_name}",
'filename': rf.name,
'room_id': room_id,
'path': rf.path,
'type': rf.type,
'size': rf.size if rf.type == 'file' else None
},
user_id=current_user.id
)
except Exception as e:
print(f"Error deleting file {rf.name}: {str(e)}")
continue
# Commit all changes
try:
db.session.commit()
except Exception as e:
print(f"Error committing changes: {str(e)}")
db.session.rollback()
return jsonify({'error': 'Failed to delete files'}), 500
return jsonify({'success': True})