996 lines
34 KiB
Python
996 lines
34 KiB
Python
"""
|
|
Room Files Management Module
|
|
|
|
This module provides a Flask Blueprint for managing files within rooms in the DocuPulse application.
|
|
It handles file operations such as uploading, downloading, moving, deleting, and organizing files
|
|
within room contexts. The module implements permission-based access control and supports various
|
|
file operations including file organization, starring, and trash management.
|
|
|
|
Key Features:
|
|
- File upload and download with permission checks
|
|
- Folder creation and management
|
|
- File operations (move, rename, delete)
|
|
- File starring system
|
|
- Trash management with restore capability
|
|
- Bulk operations (zip download)
|
|
- File search functionality
|
|
|
|
The module uses a combination of database records (RoomFile model) and actual file system storage
|
|
to maintain file metadata and content.
|
|
"""
|
|
|
|
from flask import Blueprint, jsonify, request, abort, send_from_directory, send_file
|
|
from flask_login import login_required, current_user
|
|
import os
|
|
from models import Room, RoomMemberPermission, RoomFile, TrashedFile, db
|
|
from werkzeug.utils import secure_filename, safe_join
|
|
import time
|
|
import shutil
|
|
import io
|
|
import zipfile
|
|
from datetime import datetime
|
|
from utils import log_event
|
|
|
|
# Blueprint for room file operations
|
|
room_files_bp = Blueprint('room_files', __name__, url_prefix='/api/rooms')
|
|
|
|
# Root directory for storing room files
|
|
DATA_ROOT = '/data/rooms' # This should be a Docker volume
|
|
|
|
# Set of allowed file extensions for upload
|
|
ALLOWED_EXTENSIONS = {
|
|
# Documents
|
|
'pdf', 'docx', 'doc', 'txt', 'rtf', 'odt', 'md', 'csv',
|
|
# Spreadsheets
|
|
'xlsx', 'xls', 'ods', 'xlsm',
|
|
# Presentations
|
|
'pptx', 'ppt', 'odp',
|
|
# Images
|
|
'jpg', 'jpeg', 'png', 'gif', 'bmp', 'svg', 'webp', 'tiff',
|
|
# Archives
|
|
'zip', 'rar', '7z', 'tar', 'gz',
|
|
# Code/Text
|
|
'py', 'js', 'html', 'css', 'json', 'xml', 'sql', 'sh', 'bat',
|
|
# Audio
|
|
'mp3', 'wav', 'ogg', 'm4a', 'flac',
|
|
# Video
|
|
'mp4', 'avi', 'mov', 'wmv', 'flv', 'mkv', 'webm',
|
|
# CAD/Design
|
|
'dwg', 'dxf', 'ai', 'psd', 'eps', 'indd',
|
|
# Other
|
|
'eml', 'msg', 'vcf', 'ics'
|
|
}
|
|
|
|
def get_room_dir(room_id):
|
|
"""
|
|
Get the absolute path to a room's directory.
|
|
|
|
Args:
|
|
room_id (int): The ID of the room
|
|
|
|
Returns:
|
|
str: Absolute path to the room's directory
|
|
"""
|
|
return os.path.join(DATA_ROOT, str(room_id))
|
|
|
|
def user_has_permission(room, perm_name):
|
|
"""
|
|
Check if the current user has a specific permission in a room.
|
|
|
|
Args:
|
|
room (Room): The room object to check permissions for
|
|
perm_name (str): Name of the permission to check
|
|
|
|
Returns:
|
|
bool: True if user has permission, False otherwise
|
|
"""
|
|
if current_user.is_admin:
|
|
return True
|
|
perm = RoomMemberPermission.query.filter_by(room_id=room.id, user_id=current_user.id).first()
|
|
return getattr(perm, perm_name, False) if perm else False
|
|
|
|
def allowed_file(filename):
|
|
"""
|
|
Check if a file's extension is in the allowed extensions list.
|
|
|
|
Args:
|
|
filename (str): Name of the file to check
|
|
|
|
Returns:
|
|
bool: True if file extension is allowed, False otherwise
|
|
"""
|
|
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
|
|
|
|
def clean_path(path):
|
|
"""
|
|
Clean a path string by removing leading/trailing slashes.
|
|
|
|
Args:
|
|
path (str): Path string to clean
|
|
|
|
Returns:
|
|
str: Cleaned path string
|
|
"""
|
|
if not path:
|
|
return ''
|
|
return path.strip('/\\')
|
|
|
|
@room_files_bp.route('/<int:room_id>/files', methods=['GET'])
|
|
@login_required
|
|
def list_room_files(room_id):
|
|
"""
|
|
List all files in a room's directory.
|
|
|
|
Args:
|
|
room_id (int): ID of the room to list files from
|
|
|
|
Returns:
|
|
JSON response containing list of files with their metadata
|
|
"""
|
|
room = Room.query.get_or_404(room_id)
|
|
if not user_has_permission(room, 'can_view'):
|
|
abort(403)
|
|
|
|
path = request.args.get('path', '')
|
|
path = clean_path(path)
|
|
|
|
# Get files in the current path
|
|
files = RoomFile.query.filter_by(room_id=room_id, path=path, deleted=False).all()
|
|
|
|
# Debug: Check user permissions
|
|
if not current_user.is_admin:
|
|
perm = RoomMemberPermission.query.filter_by(room_id=room.id, user_id=current_user.id).first()
|
|
print("=== User Permissions ===")
|
|
print(f" - can_view: {getattr(perm, 'can_view', False) if perm else False}")
|
|
print(f" - can_download: {getattr(perm, 'can_download', False) if perm else False}")
|
|
print(f" - can_upload: {getattr(perm, 'can_upload', False) if perm else False}")
|
|
print(f" - can_delete: {getattr(perm, 'can_delete', False) if perm else False}")
|
|
print(f" - can_rename: {getattr(perm, 'can_rename', False) if perm else False}")
|
|
print(f" - can_move: {getattr(perm, 'can_move', False) if perm else False}")
|
|
print(f" - can_share: {getattr(perm, 'can_share', False) if perm else False}")
|
|
print("-------------------")
|
|
|
|
result = []
|
|
for f in files:
|
|
uploader_full_name = None
|
|
uploader_profile_pic = None
|
|
if f.uploader:
|
|
uploader_full_name = f.uploader.username
|
|
if getattr(f.uploader, 'last_name', None):
|
|
uploader_full_name += ' ' + f.uploader.last_name
|
|
uploader_profile_pic = f.uploader.profile_picture if getattr(f.uploader, 'profile_picture', None) else None
|
|
result.append({
|
|
'name': f.name,
|
|
'type': f.type,
|
|
'size': f.size if f.type == 'file' else '-',
|
|
'modified': f.modified,
|
|
'uploaded_by': uploader_full_name,
|
|
'uploader_profile_pic': uploader_profile_pic,
|
|
'uploaded_at': f.uploaded_at.isoformat() if f.uploaded_at else None,
|
|
'path': f.path,
|
|
'starred': current_user in f.starred_by
|
|
})
|
|
print(f"Returning {len(result)} files") # Debug log
|
|
return jsonify(result)
|
|
|
|
@room_files_bp.route('/<int:room_id>/files/upload', methods=['POST'])
|
|
@login_required
|
|
def upload_room_file(room_id):
|
|
room = Room.query.get_or_404(room_id)
|
|
if not user_has_permission(room, 'can_upload'):
|
|
abort(403)
|
|
if 'file' not in request.files:
|
|
return jsonify({'error': 'No file part'}), 400
|
|
file = request.files['file']
|
|
if file.filename == '':
|
|
return jsonify({'error': 'No selected file'}), 400
|
|
if not allowed_file(file.filename):
|
|
return jsonify({'error': 'File type not allowed'}), 400
|
|
filename = secure_filename(file.filename)
|
|
room_dir = get_room_dir(room_id)
|
|
rel_path = clean_path(request.form.get('path', ''))
|
|
target_dir = os.path.join(room_dir, rel_path) if rel_path else room_dir
|
|
os.makedirs(target_dir, exist_ok=True)
|
|
file_path = os.path.join(target_dir, filename)
|
|
|
|
# Check for overwrite flag
|
|
overwrite = request.form.get('overwrite', 'false').lower() == 'true'
|
|
|
|
# First check for non-deleted files
|
|
existing_file = RoomFile.query.filter_by(room_id=room_id, name=filename, path=rel_path, deleted=False).first()
|
|
if existing_file and not overwrite:
|
|
return jsonify({'error': 'A file with this name already exists in this location', 'conflict': True}), 409
|
|
|
|
# Then check for deleted files
|
|
trashed_file = RoomFile.query.filter_by(room_id=room_id, name=filename, path=rel_path, deleted=True).first()
|
|
if trashed_file:
|
|
# If we're not overwriting, return conflict
|
|
if not overwrite:
|
|
return jsonify({'error': 'A file with this name exists in the trash', 'conflict': True}), 409
|
|
|
|
# If we are overwriting, delete the trashed file record
|
|
db.session.delete(trashed_file)
|
|
db.session.commit()
|
|
existing_file = None
|
|
|
|
file.save(file_path)
|
|
stat = os.stat(file_path)
|
|
if existing_file:
|
|
# Overwrite: update the RoomFile record
|
|
existing_file.size = stat.st_size
|
|
existing_file.modified = stat.st_mtime
|
|
existing_file.uploaded_by = current_user.id
|
|
existing_file.uploaded_at = datetime.utcnow()
|
|
db.session.commit()
|
|
log_event(
|
|
event_type='file_upload',
|
|
details={
|
|
'uploaded_by': f"{current_user.username} {current_user.last_name}",
|
|
'filename': filename,
|
|
'room_id': room_id,
|
|
'path': rel_path,
|
|
'size': stat.st_size,
|
|
'overwritten': True
|
|
},
|
|
user_id=current_user.id
|
|
)
|
|
return jsonify({'success': True, 'filename': filename, 'overwritten': True})
|
|
else:
|
|
rf = RoomFile(
|
|
room_id=room_id,
|
|
name=filename,
|
|
path=rel_path,
|
|
type='file',
|
|
size=stat.st_size,
|
|
modified=stat.st_mtime,
|
|
uploaded_by=current_user.id,
|
|
uploaded_at=datetime.utcnow()
|
|
)
|
|
db.session.add(rf)
|
|
db.session.commit()
|
|
log_event(
|
|
event_type='file_upload',
|
|
details={
|
|
'uploaded_by': f"{current_user.username} {current_user.last_name}",
|
|
'filename': filename,
|
|
'room_id': room_id,
|
|
'path': rel_path,
|
|
'size': stat.st_size,
|
|
'overwritten': False
|
|
},
|
|
user_id=current_user.id
|
|
)
|
|
return jsonify({'success': True, 'filename': filename})
|
|
|
|
@room_files_bp.route('/<int:room_id>/files/<filename>', methods=['GET'])
|
|
@login_required
|
|
def download_room_file(room_id, filename):
|
|
"""
|
|
Download a file from a room.
|
|
|
|
Args:
|
|
room_id (int): ID of the room containing the file
|
|
filename (str): Name of the file to download
|
|
|
|
Returns:
|
|
File download response or error message
|
|
"""
|
|
room = Room.query.get_or_404(room_id)
|
|
if not user_has_permission(room, 'can_download'):
|
|
abort(403)
|
|
rel_path = clean_path(request.args.get('path', ''))
|
|
# Lookup in RoomFile
|
|
rf = RoomFile.query.filter_by(room_id=room_id, name=filename, path=rel_path).first()
|
|
if not rf or rf.type != 'file':
|
|
return jsonify({'error': 'File not found'}), 404
|
|
room_dir = get_room_dir(room_id)
|
|
file_path = os.path.join(room_dir, rel_path, filename) if rel_path else os.path.join(room_dir, filename)
|
|
if not os.path.exists(file_path):
|
|
return jsonify({'error': 'File not found'}), 404
|
|
|
|
log_event(
|
|
event_type='file_download',
|
|
details={
|
|
'downloaded_by': f"{current_user.username} {current_user.last_name}",
|
|
'filename': filename,
|
|
'room_id': room_id,
|
|
'path': rel_path,
|
|
'size': rf.size if rf else None
|
|
},
|
|
user_id=current_user.id
|
|
)
|
|
return send_from_directory(os.path.dirname(file_path), filename, as_attachment=True)
|
|
|
|
@room_files_bp.route('/<int:room_id>/files/<path:filename>', methods=['DELETE'])
|
|
@login_required
|
|
def delete_file(room_id, filename):
|
|
"""
|
|
Delete a file from a room (moves to trash).
|
|
|
|
Args:
|
|
room_id (int): ID of the room containing the file
|
|
filename (str): Name of the file to delete
|
|
|
|
Returns:
|
|
JSON response indicating success or error
|
|
"""
|
|
room = Room.query.get_or_404(room_id)
|
|
if not user_has_permission(room, 'can_delete'):
|
|
abort(403)
|
|
|
|
rel_path = clean_path(request.args.get('path', ''))
|
|
|
|
# Lookup in RoomFile
|
|
rf = RoomFile.query.filter_by(room_id=room_id, name=filename, path=rel_path).first()
|
|
if not rf:
|
|
return jsonify({'error': 'File not found'}), 404
|
|
|
|
# Mark as deleted and record who deleted it and when
|
|
rf.deleted = True
|
|
rf.deleted_by = current_user.id
|
|
rf.deleted_at = datetime.utcnow()
|
|
db.session.commit()
|
|
|
|
log_event(
|
|
event_type='file_delete',
|
|
details={
|
|
'deleted_by': f"{current_user.username} {current_user.last_name}",
|
|
'filename': filename,
|
|
'room_id': room_id,
|
|
'path': rel_path,
|
|
'type': rf.type,
|
|
'size': rf.size if rf.type == 'file' else None
|
|
},
|
|
user_id=current_user.id
|
|
)
|
|
|
|
return jsonify({'success': True})
|
|
|
|
@room_files_bp.route('/<int:room_id>/folders', methods=['POST'])
|
|
@login_required
|
|
def create_room_folder(room_id):
|
|
"""
|
|
Create a new folder in a room.
|
|
|
|
Args:
|
|
room_id (int): ID of the room to create folder in
|
|
|
|
Returns:
|
|
JSON response indicating success or error
|
|
"""
|
|
room = Room.query.get_or_404(room_id)
|
|
if not user_has_permission(room, 'can_upload'):
|
|
abort(403)
|
|
data = request.get_json()
|
|
folder_name = data.get('name', '').strip()
|
|
rel_path = clean_path(data.get('path', ''))
|
|
|
|
if not folder_name or '/' in folder_name or '\\' in folder_name or folder_name.startswith('.'):
|
|
return jsonify({'error': 'Invalid folder name'}), 400
|
|
|
|
room_dir = get_room_dir(room_id)
|
|
target_dir = os.path.join(room_dir, rel_path) if rel_path else room_dir
|
|
os.makedirs(target_dir, exist_ok=True)
|
|
folder_path = os.path.join(target_dir, folder_name)
|
|
|
|
# First check for trashed folder
|
|
trashed_folder = RoomFile.query.filter_by(
|
|
room_id=room_id,
|
|
name=folder_name,
|
|
path=rel_path,
|
|
type='folder',
|
|
deleted=True
|
|
).first()
|
|
|
|
if trashed_folder:
|
|
return jsonify({'error': 'A folder with this name exists in the trash'}), 409
|
|
|
|
# Then check for existing folder in current location
|
|
existing_folder = RoomFile.query.filter_by(
|
|
room_id=room_id,
|
|
name=folder_name,
|
|
path=rel_path,
|
|
type='folder',
|
|
deleted=False
|
|
).first()
|
|
|
|
if existing_folder:
|
|
return jsonify({'error': 'A folder with this name already exists in this location'}), 400
|
|
|
|
if os.path.exists(folder_path):
|
|
return jsonify({'error': 'A folder with this name already exists in this location'}), 400
|
|
|
|
os.makedirs(folder_path)
|
|
# Add RoomFile entry
|
|
stat = os.stat(folder_path)
|
|
rf = RoomFile(
|
|
room_id=room_id,
|
|
name=folder_name,
|
|
path=rel_path,
|
|
type='folder',
|
|
size=None,
|
|
modified=stat.st_mtime,
|
|
uploaded_by=current_user.id,
|
|
uploaded_at=datetime.utcnow()
|
|
)
|
|
db.session.add(rf)
|
|
db.session.commit()
|
|
return jsonify({'success': True, 'name': folder_name})
|
|
|
|
@room_files_bp.route('/<int:room_id>/rename', methods=['POST'])
|
|
@login_required
|
|
def rename_room_file(room_id):
|
|
"""
|
|
Rename a file or folder in a room.
|
|
|
|
Args:
|
|
room_id (int): ID of the room containing the file/folder
|
|
|
|
Returns:
|
|
JSON response indicating success or error
|
|
"""
|
|
room = Room.query.get_or_404(room_id)
|
|
# Allow rename if user can upload or delete
|
|
if not (user_has_permission(room, 'can_upload') or user_has_permission(room, 'can_delete')):
|
|
abort(403)
|
|
data = request.get_json()
|
|
old_name = data.get('old_name', '').strip()
|
|
new_name = data.get('new_name', '').strip()
|
|
rel_path = clean_path(data.get('path', ''))
|
|
if not old_name or not new_name or '/' in new_name or '\\' in new_name or new_name.startswith('.'):
|
|
return jsonify({'error': 'Invalid name'}), 400
|
|
# Lookup in RoomFile
|
|
rf = RoomFile.query.filter_by(room_id=room_id, name=old_name, path=rel_path).first()
|
|
if not rf:
|
|
return jsonify({'error': 'Original file/folder not found'}), 404
|
|
room_dir = get_room_dir(room_id)
|
|
base_dir = os.path.join(room_dir, rel_path) if rel_path else room_dir
|
|
old_path = os.path.join(base_dir, old_name)
|
|
new_path = os.path.join(base_dir, new_name)
|
|
if not os.path.exists(old_path):
|
|
return jsonify({'error': 'Original file/folder not found'}), 404
|
|
if os.path.exists(new_path):
|
|
return jsonify({'error': 'A file or folder with the new name already exists'}), 400
|
|
# Prevent file extension change for files
|
|
if os.path.isfile(old_path):
|
|
old_ext = os.path.splitext(old_name)[1].lower()
|
|
new_ext = os.path.splitext(new_name)[1].lower()
|
|
if old_ext != new_ext:
|
|
return jsonify({'error': 'File extension cannot be changed'}), 400
|
|
os.rename(old_path, new_path)
|
|
# Update RoomFile entry
|
|
rf.name = new_name
|
|
rf.modified = os.path.getmtime(new_path)
|
|
|
|
# If this is a folder, update paths of all contained files and subfolders
|
|
if os.path.isdir(new_path):
|
|
old_folder_path = os.path.join(rel_path, old_name) if rel_path else old_name
|
|
new_folder_path = os.path.join(rel_path, new_name) if rel_path else new_name
|
|
|
|
# Get all files and folders that are under the old path
|
|
contained_items = RoomFile.query.filter(
|
|
RoomFile.room_id == room_id,
|
|
RoomFile.path.like(f"{old_folder_path}%")
|
|
).all()
|
|
|
|
# Update their paths
|
|
for item in contained_items:
|
|
# Replace the old folder path with the new one in the item's path
|
|
item.path = item.path.replace(old_folder_path, new_folder_path, 1)
|
|
|
|
db.session.commit()
|
|
|
|
log_event(
|
|
event_type='file_rename',
|
|
details={
|
|
'renamed_by': f"{current_user.username} {current_user.last_name}",
|
|
'old_name': old_name,
|
|
'new_name': new_name,
|
|
'room_id': room_id,
|
|
'path': rel_path,
|
|
'type': rf.type,
|
|
'size': rf.size if rf.type == 'file' else None,
|
|
'is_folder': os.path.isdir(new_path)
|
|
},
|
|
user_id=current_user.id
|
|
)
|
|
|
|
return jsonify({'success': True, 'old_name': old_name, 'new_name': new_name})
|
|
|
|
@room_files_bp.route('/<int:room_id>/download-zip', methods=['POST'])
|
|
@login_required
|
|
def download_zip(room_id):
|
|
"""
|
|
Download multiple files as a zip archive.
|
|
|
|
Args:
|
|
room_id (int): ID of the room containing the files
|
|
|
|
Returns:
|
|
ZIP file download response
|
|
"""
|
|
room = Room.query.get_or_404(room_id)
|
|
if not user_has_permission(room, 'can_view'):
|
|
abort(403)
|
|
data = request.get_json()
|
|
items = data.get('items', [])
|
|
if not items or not isinstance(items, list):
|
|
return jsonify({'error': 'No items selected'}), 400
|
|
room_dir = get_room_dir(room_id)
|
|
mem_zip = io.BytesIO()
|
|
with zipfile.ZipFile(mem_zip, 'w', zipfile.ZIP_DEFLATED) as zf:
|
|
for item in items:
|
|
name = item.get('name')
|
|
rel_path = item.get('path', '').strip('/\\')
|
|
abs_path = os.path.join(room_dir, rel_path, name) if rel_path else os.path.join(room_dir, name)
|
|
if not os.path.exists(abs_path):
|
|
continue
|
|
if os.path.isfile(abs_path):
|
|
zf.write(abs_path, arcname=name)
|
|
elif os.path.isdir(abs_path):
|
|
for root, dirs, files in os.walk(abs_path):
|
|
for file in files:
|
|
file_path = os.path.join(root, file)
|
|
arcname = os.path.relpath(file_path, room_dir)
|
|
zf.write(file_path, arcname=arcname)
|
|
mem_zip.seek(0)
|
|
return send_file(mem_zip, mimetype='application/zip', as_attachment=True, download_name='download.zip')
|
|
|
|
@room_files_bp.route('/<int:room_id>/search', methods=['GET'])
|
|
@login_required
|
|
def search_room_files(room_id):
|
|
"""
|
|
Search for files in a room by name.
|
|
|
|
Args:
|
|
room_id (int): ID of the room to search in
|
|
|
|
Returns:
|
|
JSON response containing matching files
|
|
"""
|
|
room = Room.query.get_or_404(room_id)
|
|
if not user_has_permission(room, 'can_view'):
|
|
abort(403)
|
|
query = request.args.get('query', '').strip().lower()
|
|
# Search RoomFile for this room
|
|
files = RoomFile.query.filter(RoomFile.room_id==room_id).all()
|
|
matches = []
|
|
for f in files:
|
|
if query in f.name.lower():
|
|
matches.append({
|
|
'name': f.name,
|
|
'type': f.type,
|
|
'size': f.size if f.type == 'file' else '-',
|
|
'modified': f.modified,
|
|
'uploaded_by': f.uploader.username if f.uploader else None,
|
|
'uploaded_at': f.uploaded_at.isoformat() if f.uploaded_at else None,
|
|
'path': f.path,
|
|
})
|
|
return jsonify(matches)
|
|
|
|
@room_files_bp.route('/<int:room_id>/move', methods=['POST'])
|
|
@login_required
|
|
def move_room_file(room_id):
|
|
"""
|
|
Move a file or folder to a different location within a room.
|
|
|
|
Args:
|
|
room_id (int): ID of the room containing the file/folder
|
|
|
|
Returns:
|
|
JSON response indicating success or error
|
|
"""
|
|
room = Room.query.get_or_404(room_id)
|
|
if not user_has_permission(room, 'can_move'):
|
|
abort(403)
|
|
data = request.get_json()
|
|
filename = data.get('filename', '').strip()
|
|
source_path = clean_path(data.get('source_path', ''))
|
|
target_path = clean_path(data.get('target_path', ''))
|
|
|
|
if not filename or '/' in filename or '\\' in filename or filename.startswith('.'):
|
|
return jsonify({'error': 'Invalid filename'}), 400
|
|
|
|
# Lookup in RoomFile
|
|
rf = RoomFile.query.filter_by(room_id=room_id, name=filename, path=source_path).first()
|
|
if not rf:
|
|
return jsonify({'error': 'File not found'}), 404
|
|
|
|
room_dir = get_room_dir(room_id)
|
|
source_dir = os.path.join(room_dir, source_path) if source_path else room_dir
|
|
target_dir = os.path.join(room_dir, target_path) if target_path else room_dir
|
|
|
|
source_file_path = os.path.join(source_dir, filename)
|
|
target_file_path = os.path.join(target_dir, filename)
|
|
|
|
if not os.path.exists(source_file_path):
|
|
return jsonify({'error': 'Source file not found'}), 404
|
|
|
|
if os.path.exists(target_file_path):
|
|
return jsonify({'error': 'A file with this name already exists in the target location'}), 400
|
|
|
|
# Create target directory if it doesn't exist
|
|
os.makedirs(target_dir, exist_ok=True)
|
|
|
|
# Move the file
|
|
shutil.move(source_file_path, target_file_path)
|
|
|
|
# Update RoomFile entry
|
|
rf.path = target_path
|
|
rf.modified = os.path.getmtime(target_file_path)
|
|
db.session.commit()
|
|
|
|
log_event(
|
|
event_type='file_move',
|
|
details={
|
|
'moved_by': f"{current_user.username} {current_user.last_name}",
|
|
'filename': filename,
|
|
'room_id': room_id,
|
|
'source_path': source_path,
|
|
'target_path': target_path,
|
|
'type': rf.type,
|
|
'size': rf.size if rf.type == 'file' else None
|
|
},
|
|
user_id=current_user.id
|
|
)
|
|
|
|
return jsonify({'success': True})
|
|
|
|
@room_files_bp.route('/<int:room_id>/folders', methods=['GET'])
|
|
@login_required
|
|
def list_room_folders(room_id):
|
|
"""
|
|
List all folders in a room.
|
|
|
|
Args:
|
|
room_id (int): ID of the room to list folders from
|
|
|
|
Returns:
|
|
JSON response containing list of folder paths
|
|
"""
|
|
room = Room.query.get_or_404(room_id)
|
|
if not user_has_permission(room, 'can_view'):
|
|
abort(403)
|
|
|
|
# Get all files in the room that are not deleted
|
|
files = RoomFile.query.filter_by(room_id=room_id, deleted=False).all()
|
|
|
|
# Extract unique folder paths
|
|
folders = set()
|
|
for f in files:
|
|
if f.type == 'folder':
|
|
full_path = f.path + '/' + f.name if f.path else f.name
|
|
folders.add(full_path)
|
|
|
|
# Convert to sorted list
|
|
folder_list = sorted(list(folders))
|
|
|
|
return jsonify(folder_list)
|
|
|
|
@room_files_bp.route('/<int:room_id>/star', methods=['POST'])
|
|
@login_required
|
|
def toggle_star(room_id):
|
|
"""
|
|
Toggle the starred status of a file.
|
|
|
|
Args:
|
|
room_id (int): ID of the room containing the file
|
|
|
|
Returns:
|
|
JSON response indicating success and new starred status
|
|
"""
|
|
room = Room.query.get_or_404(room_id)
|
|
if not user_has_permission(room, 'can_view'):
|
|
abort(403)
|
|
|
|
data = request.get_json()
|
|
filename = data.get('filename', '').strip()
|
|
rel_path = clean_path(data.get('path', ''))
|
|
|
|
if not filename:
|
|
return jsonify({'error': 'Invalid filename'}), 400
|
|
|
|
# Lookup in RoomFile
|
|
rf = RoomFile.query.filter_by(room_id=room_id, name=filename, path=rel_path).first()
|
|
if not rf:
|
|
return jsonify({'error': 'File not found'}), 404
|
|
|
|
# Check if the file is already starred by this user
|
|
is_starred = current_user in rf.starred_by
|
|
|
|
if is_starred:
|
|
# Unstar the file
|
|
rf.starred_by.remove(current_user)
|
|
else:
|
|
# Star the file
|
|
rf.starred_by.append(current_user)
|
|
|
|
db.session.commit()
|
|
|
|
return jsonify({'success': True, 'starred': not is_starred})
|
|
|
|
@room_files_bp.route('/<int:room_id>/starred', methods=['GET'])
|
|
@login_required
|
|
def get_starred_files(room_id):
|
|
"""
|
|
Get all starred files in a room.
|
|
|
|
Args:
|
|
room_id (int): ID of the room to get starred files from
|
|
|
|
Returns:
|
|
JSON response containing list of starred files
|
|
"""
|
|
room = Room.query.get_or_404(room_id)
|
|
if not user_has_permission(room, 'can_view'):
|
|
abort(403)
|
|
|
|
# Get all starred files in the room
|
|
files = RoomFile.query.filter_by(room_id=room_id, starred=True).all()
|
|
|
|
result = []
|
|
for f in files:
|
|
uploader_full_name = None
|
|
uploader_profile_pic = None
|
|
if f.uploader:
|
|
uploader_full_name = f.uploader.username
|
|
if getattr(f.uploader, 'last_name', None):
|
|
uploader_full_name += ' ' + f.uploader.last_name
|
|
uploader_profile_pic = f.uploader.profile_picture if getattr(f.uploader, 'profile_picture', None) else None
|
|
result.append({
|
|
'name': f.name,
|
|
'type': f.type,
|
|
'size': f.size if f.type == 'file' else '-',
|
|
'modified': f.modified,
|
|
'uploaded_by': uploader_full_name,
|
|
'uploader_profile_pic': uploader_profile_pic,
|
|
'uploaded_at': f.uploaded_at.isoformat() if f.uploaded_at else None,
|
|
'path': f.path,
|
|
'starred': f.starred
|
|
})
|
|
|
|
return jsonify(result)
|
|
|
|
@room_files_bp.route('/starred', methods=['GET'])
|
|
@login_required
|
|
def get_all_starred_files():
|
|
"""
|
|
Get all starred files across all accessible rooms.
|
|
|
|
Returns:
|
|
JSON response containing list of all starred files
|
|
"""
|
|
# Get all rooms the user has access to
|
|
if current_user.is_admin:
|
|
rooms = Room.query.all()
|
|
else:
|
|
rooms = Room.query.filter(Room.members.any(id=current_user.id)).all()
|
|
|
|
room_ids = [room.id for room in rooms]
|
|
room_names = {room.id: room.name for room in rooms}
|
|
|
|
# Get all files starred by the current user from accessible rooms
|
|
files = RoomFile.query.filter(
|
|
RoomFile.room_id.in_(room_ids),
|
|
RoomFile.starred_by.contains(current_user)
|
|
).all()
|
|
|
|
result = []
|
|
for f in files:
|
|
uploader_full_name = None
|
|
uploader_profile_pic = None
|
|
if f.uploader:
|
|
uploader_full_name = f.uploader.username
|
|
if getattr(f.uploader, 'last_name', None):
|
|
uploader_full_name += ' ' + f.uploader.last_name
|
|
uploader_profile_pic = f.uploader.profile_picture if getattr(f.uploader, 'profile_picture', None) else None
|
|
result.append({
|
|
'name': f.name,
|
|
'type': f.type,
|
|
'size': f.size if f.type == 'file' else '-',
|
|
'modified': f.modified,
|
|
'uploaded_by': uploader_full_name,
|
|
'uploader_profile_pic': uploader_profile_pic,
|
|
'uploaded_at': f.uploaded_at.isoformat() if f.uploaded_at else None,
|
|
'path': f.path,
|
|
'starred': True,
|
|
'room_id': f.room_id,
|
|
'room_name': room_names.get(f.room_id)
|
|
})
|
|
|
|
return jsonify(result)
|
|
|
|
@room_files_bp.route('/trash', methods=['GET'])
|
|
@login_required
|
|
def get_trash_files():
|
|
"""
|
|
Get all deleted files from accessible rooms.
|
|
|
|
Returns:
|
|
JSON response containing list of deleted files
|
|
"""
|
|
# Get all rooms the user has access to
|
|
if current_user.is_admin:
|
|
rooms = Room.query.all()
|
|
else:
|
|
rooms = Room.query.filter(Room.members.any(id=current_user.id)).all()
|
|
|
|
room_ids = [room.id for room in rooms]
|
|
room_names = {room.id: room.name for room in rooms}
|
|
|
|
# Get all deleted files from accessible rooms
|
|
files = RoomFile.query.filter(
|
|
RoomFile.room_id.in_(room_ids),
|
|
RoomFile.deleted == True
|
|
).all()
|
|
|
|
result = []
|
|
for f in files:
|
|
uploader_full_name = None
|
|
uploader_profile_pic = None
|
|
if f.uploader:
|
|
uploader_full_name = f.uploader.username
|
|
if getattr(f.uploader, 'last_name', None):
|
|
uploader_full_name += ' ' + f.uploader.last_name
|
|
uploader_profile_pic = f.uploader.profile_picture if getattr(f.uploader, 'profile_picture', None) else None
|
|
|
|
deleter_full_name = None
|
|
if f.deleter:
|
|
deleter_full_name = f.deleter.username
|
|
if getattr(f.deleter, 'last_name', None):
|
|
deleter_full_name += ' ' + f.deleter.last_name
|
|
|
|
# Check if user has delete permission in this room
|
|
room = Room.query.get(f.room_id)
|
|
has_delete_permission = user_has_permission(room, 'can_delete') if room else False
|
|
|
|
# Check if user can restore this file
|
|
can_restore = has_delete_permission and f.deleted_by == current_user.id
|
|
|
|
result.append({
|
|
'name': f.name,
|
|
'type': f.type,
|
|
'size': f.size if f.type == 'file' else '-',
|
|
'modified': f.modified,
|
|
'uploaded_by': uploader_full_name,
|
|
'uploader_profile_pic': uploader_profile_pic,
|
|
'uploaded_at': f.uploaded_at.isoformat() if f.uploaded_at else None,
|
|
'path': f.path,
|
|
'room_id': f.room_id,
|
|
'room_name': room_names.get(f.room_id),
|
|
'deleted_by': deleter_full_name,
|
|
'deleted_at': f.deleted_at.isoformat() if f.deleted_at else None,
|
|
'can_restore': can_restore
|
|
})
|
|
|
|
return jsonify(result)
|
|
|
|
@room_files_bp.route('/<int:room_id>/restore', methods=['POST'])
|
|
@login_required
|
|
def restore_file(room_id):
|
|
"""
|
|
Restore a deleted file from trash.
|
|
|
|
Args:
|
|
room_id (int): ID of the room containing the deleted file
|
|
|
|
Returns:
|
|
JSON response indicating success or error
|
|
"""
|
|
room = Room.query.get_or_404(room_id)
|
|
# Check for delete permission instead of view permission
|
|
if not user_has_permission(room, 'can_delete'):
|
|
abort(403)
|
|
|
|
data = request.get_json()
|
|
filename = data.get('filename', '').strip()
|
|
rel_path = clean_path(data.get('path', ''))
|
|
|
|
if not filename:
|
|
return jsonify({'error': 'Invalid filename'}), 400
|
|
|
|
# Lookup in RoomFile
|
|
rf = RoomFile.query.filter_by(room_id=room_id, name=filename, path=rel_path).first()
|
|
if not rf:
|
|
return jsonify({'error': 'File not found'}), 404
|
|
|
|
# Check if the current user was the one who deleted the file
|
|
if rf.deleted_by != current_user.id:
|
|
return jsonify({'error': 'You can only restore files that you deleted'}), 403
|
|
|
|
# Restore file by setting deleted to False
|
|
rf.deleted = False
|
|
db.session.commit()
|
|
|
|
log_event(
|
|
event_type='file_restore',
|
|
details={
|
|
'restored_by': f"{current_user.username} {current_user.last_name}",
|
|
'filename': filename,
|
|
'room_id': room_id,
|
|
'path': rel_path,
|
|
'type': rf.type,
|
|
'size': rf.size if rf.type == 'file' else None
|
|
},
|
|
user_id=current_user.id
|
|
)
|
|
|
|
return jsonify({'success': True})
|
|
|
|
@room_files_bp.route('/<int:room_id>/delete-permanent', methods=['POST'])
|
|
@login_required
|
|
def delete_permanent(room_id):
|
|
"""
|
|
Permanently delete files from trash (admin only).
|
|
|
|
Args:
|
|
room_id (int): ID of the room containing the files to delete
|
|
|
|
Returns:
|
|
JSON response indicating success or error
|
|
"""
|
|
# Only allow admin users to permanently delete files
|
|
if not current_user.is_admin:
|
|
abort(403)
|
|
|
|
room = Room.query.get_or_404(room_id)
|
|
data = request.get_json()
|
|
filename = data.get('filename', '').strip()
|
|
rel_path = clean_path(data.get('path', ''))
|
|
|
|
if not filename:
|
|
return jsonify({'error': 'Invalid filename'}), 400
|
|
|
|
# If filename is '*', delete all deleted files in the room
|
|
if filename == '*':
|
|
files_to_delete = RoomFile.query.filter_by(room_id=room_id, deleted=True).all()
|
|
else:
|
|
# Lookup specific file
|
|
files_to_delete = [RoomFile.query.filter_by(room_id=room_id, name=filename, path=rel_path).first()]
|
|
if not files_to_delete[0]:
|
|
return jsonify({'error': 'File not found'}), 404
|
|
|
|
for rf in files_to_delete:
|
|
if not rf:
|
|
continue
|
|
|
|
# Delete the file/folder from storage
|
|
try:
|
|
file_path = os.path.join(get_room_dir(room_id), rf.path, rf.name)
|
|
if os.path.exists(file_path):
|
|
if rf.type == 'file':
|
|
os.remove(file_path)
|
|
elif rf.type == 'folder':
|
|
# For folders, we need to delete all contents recursively
|
|
shutil.rmtree(file_path)
|
|
|
|
# Also delete all database records for files and subfolders
|
|
folder_path = os.path.join(rf.path, rf.name) if rf.path else rf.name
|
|
contained_items = RoomFile.query.filter(
|
|
RoomFile.room_id == room_id,
|
|
RoomFile.path.like(f"{folder_path}%")
|
|
).all()
|
|
|
|
for item in contained_items:
|
|
db.session.delete(item)
|
|
|
|
log_event(
|
|
event_type='file_delete_permanent',
|
|
details={
|
|
'deleted_by': f"{current_user.username} {current_user.last_name}",
|
|
'filename': rf.name,
|
|
'room_id': room_id,
|
|
'path': rf.path,
|
|
'type': rf.type,
|
|
'size': rf.size if rf.type == 'file' else None
|
|
},
|
|
user_id=current_user.id
|
|
)
|
|
except Exception as e:
|
|
print(f"Error deleting {rf.type} from storage: {e}")
|
|
|
|
# Delete the database record
|
|
db.session.delete(rf)
|
|
|
|
db.session.commit()
|
|
return jsonify({'success': True}) |