Files
docupulse/routes/room_files.py
2025-05-25 10:31:22 +02:00

668 lines
25 KiB
Python

from flask import Blueprint, jsonify, request, abort, send_from_directory, send_file
from flask_login import login_required, current_user
import os
from models import Room, RoomMemberPermission, RoomFile, TrashedFile, db
from werkzeug.utils import secure_filename, safe_join
import time
import shutil
import io
import zipfile
from datetime import datetime
room_files_bp = Blueprint('room_files', __name__, url_prefix='/api/rooms')
DATA_ROOT = '/data/rooms' # This should be a Docker volume
ALLOWED_EXTENSIONS = {
# Documents
'pdf', 'docx', 'doc', 'txt', 'rtf', 'odt', 'md', 'csv',
# Spreadsheets
'xlsx', 'xls', 'ods', 'xlsm',
# Presentations
'pptx', 'ppt', 'odp',
# Images
'jpg', 'jpeg', 'png', 'gif', 'bmp', 'svg', 'webp', 'tiff',
# Archives
'zip', 'rar', '7z', 'tar', 'gz',
# Code/Text
'py', 'js', 'html', 'css', 'json', 'xml', 'sql', 'sh', 'bat',
# Audio
'mp3', 'wav', 'ogg', 'm4a', 'flac',
# Video
'mp4', 'avi', 'mov', 'wmv', 'flv', 'mkv', 'webm',
# CAD/Design
'dwg', 'dxf', 'ai', 'psd', 'eps', 'indd',
# Other
'eml', 'msg', 'vcf', 'ics'
}
def get_room_dir(room_id):
return os.path.join(DATA_ROOT, str(room_id))
def user_has_permission(room, perm_name):
if current_user.is_admin:
return True
perm = RoomMemberPermission.query.filter_by(room_id=room.id, user_id=current_user.id).first()
return getattr(perm, perm_name, False) if perm else False
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def clean_path(path):
if not path:
return ''
return path.strip('/\\')
@room_files_bp.route('/<int:room_id>/files', methods=['GET'])
@login_required
def list_room_files(room_id):
room = Room.query.get_or_404(room_id)
if not user_has_permission(room, 'can_view'):
abort(403)
path = request.args.get('path', '')
path = clean_path(path)
# Get files in the current path
files = RoomFile.query.filter_by(room_id=room_id, path=path, deleted=False).all()
# Debug: Check user permissions
if not current_user.is_admin:
perm = RoomMemberPermission.query.filter_by(room_id=room.id, user_id=current_user.id).first()
print("=== User Permissions ===")
print(f" - can_view: {getattr(perm, 'can_view', False) if perm else False}")
print(f" - can_download: {getattr(perm, 'can_download', False) if perm else False}")
print(f" - can_upload: {getattr(perm, 'can_upload', False) if perm else False}")
print(f" - can_delete: {getattr(perm, 'can_delete', False) if perm else False}")
print(f" - can_rename: {getattr(perm, 'can_rename', False) if perm else False}")
print(f" - can_move: {getattr(perm, 'can_move', False) if perm else False}")
print(f" - can_share: {getattr(perm, 'can_share', False) if perm else False}")
print("-------------------")
result = []
for f in files:
uploader_full_name = None
uploader_profile_pic = None
if f.uploader:
uploader_full_name = f.uploader.username
if getattr(f.uploader, 'last_name', None):
uploader_full_name += ' ' + f.uploader.last_name
uploader_profile_pic = f.uploader.profile_picture if getattr(f.uploader, 'profile_picture', None) else None
result.append({
'name': f.name,
'type': f.type,
'size': f.size if f.type == 'file' else '-',
'modified': f.modified,
'uploaded_by': uploader_full_name,
'uploader_profile_pic': uploader_profile_pic,
'uploaded_at': f.uploaded_at.isoformat() if f.uploaded_at else None,
'path': f.path,
'starred': current_user in f.starred_by
})
print(f"Returning {len(result)} files") # Debug log
return jsonify(result)
@room_files_bp.route('/<int:room_id>/files/upload', methods=['POST'])
@login_required
def upload_room_file(room_id):
room = Room.query.get_or_404(room_id)
if not user_has_permission(room, 'can_upload'):
abort(403)
if 'file' not in request.files:
return jsonify({'error': 'No file part'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No selected file'}), 400
if not allowed_file(file.filename):
return jsonify({'error': 'File type not allowed'}), 400
filename = secure_filename(file.filename)
room_dir = get_room_dir(room_id)
rel_path = clean_path(request.form.get('path', ''))
target_dir = os.path.join(room_dir, rel_path) if rel_path else room_dir
os.makedirs(target_dir, exist_ok=True)
file_path = os.path.join(target_dir, filename)
# Check for overwrite flag
overwrite = request.form.get('overwrite', 'false').lower() == 'true'
# First check for non-deleted files
existing_file = RoomFile.query.filter_by(room_id=room_id, name=filename, path=rel_path, deleted=False).first()
if existing_file and not overwrite:
return jsonify({'error': 'A file with this name already exists in this location', 'conflict': True}), 409
# Then check for deleted files
trashed_file = RoomFile.query.filter_by(room_id=room_id, name=filename, path=rel_path, deleted=True).first()
if trashed_file:
# If we're not overwriting, return conflict
if not overwrite:
return jsonify({'error': 'A file with this name exists in the trash', 'conflict': True}), 409
# If we are overwriting, delete the trashed file record
db.session.delete(trashed_file)
db.session.commit()
existing_file = None
file.save(file_path)
stat = os.stat(file_path)
if existing_file:
# Overwrite: update the RoomFile record
existing_file.size = stat.st_size
existing_file.modified = stat.st_mtime
existing_file.uploaded_by = current_user.id
existing_file.uploaded_at = datetime.utcnow()
db.session.commit()
return jsonify({'success': True, 'filename': filename, 'overwritten': True})
else:
rf = RoomFile(
room_id=room_id,
name=filename,
path=rel_path,
type='file',
size=stat.st_size,
modified=stat.st_mtime,
uploaded_by=current_user.id,
uploaded_at=datetime.utcnow()
)
db.session.add(rf)
db.session.commit()
return jsonify({'success': True, 'filename': filename})
@room_files_bp.route('/<int:room_id>/files/<filename>', methods=['GET'])
@login_required
def download_room_file(room_id, filename):
room = Room.query.get_or_404(room_id)
if not user_has_permission(room, 'can_download'):
abort(403)
rel_path = clean_path(request.args.get('path', ''))
# Lookup in RoomFile
rf = RoomFile.query.filter_by(room_id=room_id, name=filename, path=rel_path).first()
if not rf or rf.type != 'file':
return jsonify({'error': 'File not found'}), 404
room_dir = get_room_dir(room_id)
file_path = os.path.join(room_dir, rel_path, filename) if rel_path else os.path.join(room_dir, filename)
if not os.path.exists(file_path):
return jsonify({'error': 'File not found'}), 404
return send_from_directory(os.path.dirname(file_path), filename, as_attachment=True)
@room_files_bp.route('/<int:room_id>/files/<path:filename>', methods=['DELETE'])
@login_required
def delete_file(room_id, filename):
room = Room.query.get_or_404(room_id)
if not user_has_permission(room, 'can_delete'):
abort(403)
rel_path = clean_path(request.args.get('path', ''))
# Lookup in RoomFile
rf = RoomFile.query.filter_by(room_id=room_id, name=filename, path=rel_path).first()
if not rf:
return jsonify({'error': 'File not found'}), 404
# Mark as deleted and record who deleted it and when
rf.deleted = True
rf.deleted_by = current_user.id
rf.deleted_at = datetime.utcnow()
db.session.commit()
return jsonify({'success': True})
@room_files_bp.route('/<int:room_id>/folders', methods=['POST'])
@login_required
def create_room_folder(room_id):
room = Room.query.get_or_404(room_id)
if not user_has_permission(room, 'can_upload'):
abort(403)
data = request.get_json()
folder_name = data.get('name', '').strip()
rel_path = clean_path(data.get('path', ''))
if not folder_name or '/' in folder_name or '\\' in folder_name or folder_name.startswith('.'):
return jsonify({'error': 'Invalid folder name'}), 400
room_dir = get_room_dir(room_id)
target_dir = os.path.join(room_dir, rel_path) if rel_path else room_dir
os.makedirs(target_dir, exist_ok=True)
folder_path = os.path.join(target_dir, folder_name)
# First check for trashed folder
trashed_folder = RoomFile.query.filter_by(
room_id=room_id,
name=folder_name,
path=rel_path,
type='folder',
deleted=True
).first()
if trashed_folder:
return jsonify({'error': 'A folder with this name exists in the trash'}), 409
# Then check for existing folder in current location
existing_folder = RoomFile.query.filter_by(
room_id=room_id,
name=folder_name,
path=rel_path,
type='folder',
deleted=False
).first()
if existing_folder:
return jsonify({'error': 'A folder with this name already exists in this location'}), 400
if os.path.exists(folder_path):
return jsonify({'error': 'A folder with this name already exists in this location'}), 400
os.makedirs(folder_path)
# Add RoomFile entry
stat = os.stat(folder_path)
rf = RoomFile(
room_id=room_id,
name=folder_name,
path=rel_path,
type='folder',
size=None,
modified=stat.st_mtime,
uploaded_by=current_user.id,
uploaded_at=datetime.utcnow()
)
db.session.add(rf)
db.session.commit()
return jsonify({'success': True, 'name': folder_name})
@room_files_bp.route('/<int:room_id>/rename', methods=['POST'])
@login_required
def rename_room_file(room_id):
room = Room.query.get_or_404(room_id)
# Allow rename if user can upload or delete
if not (user_has_permission(room, 'can_upload') or user_has_permission(room, 'can_delete')):
abort(403)
data = request.get_json()
old_name = data.get('old_name', '').strip()
new_name = data.get('new_name', '').strip()
rel_path = clean_path(data.get('path', ''))
if not old_name or not new_name or '/' in new_name or '\\' in new_name or new_name.startswith('.'):
return jsonify({'error': 'Invalid name'}), 400
# Lookup in RoomFile
rf = RoomFile.query.filter_by(room_id=room_id, name=old_name, path=rel_path).first()
if not rf:
return jsonify({'error': 'Original file/folder not found'}), 404
room_dir = get_room_dir(room_id)
base_dir = os.path.join(room_dir, rel_path) if rel_path else room_dir
old_path = os.path.join(base_dir, old_name)
new_path = os.path.join(base_dir, new_name)
if not os.path.exists(old_path):
return jsonify({'error': 'Original file/folder not found'}), 404
if os.path.exists(new_path):
return jsonify({'error': 'A file or folder with the new name already exists'}), 400
# Prevent file extension change for files
if os.path.isfile(old_path):
old_ext = os.path.splitext(old_name)[1].lower()
new_ext = os.path.splitext(new_name)[1].lower()
if old_ext != new_ext:
return jsonify({'error': 'File extension cannot be changed'}), 400
os.rename(old_path, new_path)
# Update RoomFile entry
rf.name = new_name
rf.modified = os.path.getmtime(new_path)
db.session.commit()
return jsonify({'success': True, 'old_name': old_name, 'new_name': new_name})
@room_files_bp.route('/<int:room_id>/download-zip', methods=['POST'])
@login_required
def download_zip(room_id):
room = Room.query.get_or_404(room_id)
if not user_has_permission(room, 'can_view'):
abort(403)
data = request.get_json()
items = data.get('items', [])
if not items or not isinstance(items, list):
return jsonify({'error': 'No items selected'}), 400
room_dir = get_room_dir(room_id)
mem_zip = io.BytesIO()
with zipfile.ZipFile(mem_zip, 'w', zipfile.ZIP_DEFLATED) as zf:
for item in items:
name = item.get('name')
rel_path = item.get('path', '').strip('/\\')
abs_path = os.path.join(room_dir, rel_path, name) if rel_path else os.path.join(room_dir, name)
if not os.path.exists(abs_path):
continue
if os.path.isfile(abs_path):
zf.write(abs_path, arcname=name)
elif os.path.isdir(abs_path):
for root, dirs, files in os.walk(abs_path):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, room_dir)
zf.write(file_path, arcname=arcname)
mem_zip.seek(0)
return send_file(mem_zip, mimetype='application/zip', as_attachment=True, download_name='download.zip')
@room_files_bp.route('/<int:room_id>/search', methods=['GET'])
@login_required
def search_room_files(room_id):
room = Room.query.get_or_404(room_id)
if not user_has_permission(room, 'can_view'):
abort(403)
query = request.args.get('query', '').strip().lower()
# Search RoomFile for this room
files = RoomFile.query.filter(RoomFile.room_id==room_id).all()
matches = []
for f in files:
if query in f.name.lower():
matches.append({
'name': f.name,
'type': f.type,
'size': f.size if f.type == 'file' else '-',
'modified': f.modified,
'uploaded_by': f.uploader.username if f.uploader else None,
'uploaded_at': f.uploaded_at.isoformat() if f.uploaded_at else None,
'path': f.path,
})
return jsonify(matches)
@room_files_bp.route('/<int:room_id>/move', methods=['POST'])
@login_required
def move_room_file(room_id):
room = Room.query.get_or_404(room_id)
if not user_has_permission(room, 'can_move'):
abort(403)
data = request.get_json()
filename = data.get('filename', '').strip()
source_path = clean_path(data.get('source_path', ''))
target_path = clean_path(data.get('target_path', ''))
if not filename or '/' in filename or '\\' in filename or filename.startswith('.'):
return jsonify({'error': 'Invalid filename'}), 400
# Lookup in RoomFile
rf = RoomFile.query.filter_by(room_id=room_id, name=filename, path=source_path).first()
if not rf:
return jsonify({'error': 'File not found'}), 404
room_dir = get_room_dir(room_id)
source_dir = os.path.join(room_dir, source_path) if source_path else room_dir
target_dir = os.path.join(room_dir, target_path) if target_path else room_dir
source_file_path = os.path.join(source_dir, filename)
target_file_path = os.path.join(target_dir, filename)
if not os.path.exists(source_file_path):
return jsonify({'error': 'Source file not found'}), 404
if os.path.exists(target_file_path):
return jsonify({'error': 'A file with this name already exists in the target location'}), 400
# Create target directory if it doesn't exist
os.makedirs(target_dir, exist_ok=True)
# Move the file
shutil.move(source_file_path, target_file_path)
# Update RoomFile entry
rf.path = target_path
rf.modified = os.path.getmtime(target_file_path)
db.session.commit()
return jsonify({'success': True})
@room_files_bp.route('/<int:room_id>/folders', methods=['GET'])
@login_required
def list_room_folders(room_id):
room = Room.query.get_or_404(room_id)
if not user_has_permission(room, 'can_view'):
abort(403)
# Get all files in the room that are not deleted
files = RoomFile.query.filter_by(room_id=room_id, deleted=False).all()
# Extract unique folder paths
folders = set()
for f in files:
if f.type == 'folder':
full_path = f.path + '/' + f.name if f.path else f.name
folders.add(full_path)
# Convert to sorted list
folder_list = sorted(list(folders))
return jsonify(folder_list)
@room_files_bp.route('/<int:room_id>/star', methods=['POST'])
@login_required
def toggle_star(room_id):
room = Room.query.get_or_404(room_id)
if not user_has_permission(room, 'can_view'):
abort(403)
data = request.get_json()
filename = data.get('filename', '').strip()
rel_path = clean_path(data.get('path', ''))
if not filename:
return jsonify({'error': 'Invalid filename'}), 400
# Lookup in RoomFile
rf = RoomFile.query.filter_by(room_id=room_id, name=filename, path=rel_path).first()
if not rf:
return jsonify({'error': 'File not found'}), 404
# Check if the file is already starred by this user
is_starred = current_user in rf.starred_by
if is_starred:
# Unstar the file
rf.starred_by.remove(current_user)
else:
# Star the file
rf.starred_by.append(current_user)
db.session.commit()
return jsonify({'success': True, 'starred': not is_starred})
@room_files_bp.route('/<int:room_id>/starred', methods=['GET'])
@login_required
def get_starred_files(room_id):
room = Room.query.get_or_404(room_id)
if not user_has_permission(room, 'can_view'):
abort(403)
# Get all starred files in the room
files = RoomFile.query.filter_by(room_id=room_id, starred=True).all()
result = []
for f in files:
uploader_full_name = None
uploader_profile_pic = None
if f.uploader:
uploader_full_name = f.uploader.username
if getattr(f.uploader, 'last_name', None):
uploader_full_name += ' ' + f.uploader.last_name
uploader_profile_pic = f.uploader.profile_picture if getattr(f.uploader, 'profile_picture', None) else None
result.append({
'name': f.name,
'type': f.type,
'size': f.size if f.type == 'file' else '-',
'modified': f.modified,
'uploaded_by': uploader_full_name,
'uploader_profile_pic': uploader_profile_pic,
'uploaded_at': f.uploaded_at.isoformat() if f.uploaded_at else None,
'path': f.path,
'starred': f.starred
})
return jsonify(result)
@room_files_bp.route('/starred', methods=['GET'])
@login_required
def get_all_starred_files():
# Get all rooms the user has access to
if current_user.is_admin:
rooms = Room.query.all()
else:
rooms = Room.query.filter(Room.members.any(id=current_user.id)).all()
room_ids = [room.id for room in rooms]
room_names = {room.id: room.name for room in rooms}
# Get all files starred by the current user from accessible rooms
files = RoomFile.query.filter(
RoomFile.room_id.in_(room_ids),
RoomFile.starred_by.contains(current_user)
).all()
result = []
for f in files:
uploader_full_name = None
uploader_profile_pic = None
if f.uploader:
uploader_full_name = f.uploader.username
if getattr(f.uploader, 'last_name', None):
uploader_full_name += ' ' + f.uploader.last_name
uploader_profile_pic = f.uploader.profile_picture if getattr(f.uploader, 'profile_picture', None) else None
result.append({
'name': f.name,
'type': f.type,
'size': f.size if f.type == 'file' else '-',
'modified': f.modified,
'uploaded_by': uploader_full_name,
'uploader_profile_pic': uploader_profile_pic,
'uploaded_at': f.uploaded_at.isoformat() if f.uploaded_at else None,
'path': f.path,
'starred': True,
'room_id': f.room_id,
'room_name': room_names.get(f.room_id)
})
return jsonify(result)
@room_files_bp.route('/trash', methods=['GET'])
@login_required
def get_trash_files():
# Get all rooms the user has access to
if current_user.is_admin:
rooms = Room.query.all()
else:
rooms = Room.query.filter(Room.members.any(id=current_user.id)).all()
room_ids = [room.id for room in rooms]
room_names = {room.id: room.name for room in rooms}
# Get all deleted files from accessible rooms
files = RoomFile.query.filter(
RoomFile.room_id.in_(room_ids),
RoomFile.deleted == True
).all()
result = []
for f in files:
uploader_full_name = None
uploader_profile_pic = None
if f.uploader:
uploader_full_name = f.uploader.username
if getattr(f.uploader, 'last_name', None):
uploader_full_name += ' ' + f.uploader.last_name
uploader_profile_pic = f.uploader.profile_picture if getattr(f.uploader, 'profile_picture', None) else None
deleter_full_name = None
if f.deleter:
deleter_full_name = f.deleter.username
if getattr(f.deleter, 'last_name', None):
deleter_full_name += ' ' + f.deleter.last_name
# Check if user has delete permission in this room
room = Room.query.get(f.room_id)
has_delete_permission = user_has_permission(room, 'can_delete') if room else False
# Check if user can restore this file
can_restore = has_delete_permission and f.deleted_by == current_user.id
result.append({
'name': f.name,
'type': f.type,
'size': f.size if f.type == 'file' else '-',
'modified': f.modified,
'uploaded_by': uploader_full_name,
'uploader_profile_pic': uploader_profile_pic,
'uploaded_at': f.uploaded_at.isoformat() if f.uploaded_at else None,
'path': f.path,
'room_id': f.room_id,
'room_name': room_names.get(f.room_id),
'deleted_by': deleter_full_name,
'deleted_at': f.deleted_at.isoformat() if f.deleted_at else None,
'can_restore': can_restore
})
return jsonify(result)
@room_files_bp.route('/<int:room_id>/restore', methods=['POST'])
@login_required
def restore_file(room_id):
room = Room.query.get_or_404(room_id)
# Check for delete permission instead of view permission
if not user_has_permission(room, 'can_delete'):
abort(403)
data = request.get_json()
filename = data.get('filename', '').strip()
rel_path = clean_path(data.get('path', ''))
if not filename:
return jsonify({'error': 'Invalid filename'}), 400
# Lookup in RoomFile
rf = RoomFile.query.filter_by(room_id=room_id, name=filename, path=rel_path).first()
if not rf:
return jsonify({'error': 'File not found'}), 404
# Check if the current user was the one who deleted the file
if rf.deleted_by != current_user.id:
return jsonify({'error': 'You can only restore files that you deleted'}), 403
# Restore file by setting deleted to False
rf.deleted = False
db.session.commit()
return jsonify({'success': True})
@room_files_bp.route('/<int:room_id>/delete-permanent', methods=['POST'])
@login_required
def delete_permanent(room_id):
# Only allow admin users to permanently delete files
if not current_user.is_admin:
abort(403)
room = Room.query.get_or_404(room_id)
data = request.get_json()
filename = data.get('filename', '').strip()
rel_path = clean_path(data.get('path', ''))
if not filename:
return jsonify({'error': 'Invalid filename'}), 400
# If filename is '*', delete all deleted files in the room
if filename == '*':
files_to_delete = RoomFile.query.filter_by(room_id=room_id, deleted=True).all()
else:
# Lookup specific file
files_to_delete = [RoomFile.query.filter_by(room_id=room_id, name=filename, path=rel_path).first()]
if not files_to_delete[0]:
return jsonify({'error': 'File not found'}), 404
for rf in files_to_delete:
if not rf:
continue
# Delete the file from storage if it's a file
if rf.type == 'file':
try:
file_path = os.path.join(get_room_dir(room_id), rf.path, rf.name)
if os.path.exists(file_path):
os.remove(file_path)
except Exception as e:
print(f"Error deleting file from storage: {e}")
# Delete the database record
db.session.delete(rf)
db.session.commit()
return jsonify({'success': True})