utils and event logging

This commit is contained in:
2025-05-29 15:19:42 +02:00
parent 6d959ac253
commit 5dbdd43785
23 changed files with 657 additions and 101 deletions

21
utils/__init__.py Normal file
View File

@@ -0,0 +1,21 @@
# Utils package initialization
from .permissions import user_has_permission, get_user_permissions
from .event_logger import log_event, get_user_events, get_room_events, get_recent_events, get_events_by_type, get_events_by_date_range
from .path_utils import clean_path, secure_file_path
from .time_utils import timeago, format_datetime, parse_datetime
__all__ = [
'user_has_permission',
'get_user_permissions',
'log_event',
'get_user_events',
'get_room_events',
'get_recent_events',
'get_events_by_type',
'get_events_by_date_range',
'clean_path',
'secure_file_path',
'timeago',
'format_datetime',
'parse_datetime'
]

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -2,26 +2,28 @@ from flask import request
from models import Event, EventType, db
from typing import Optional, Dict, Any, List
from datetime import datetime
from flask_login import current_user
from sqlalchemy import desc
def log_event(
event_type: EventType,
user_id: int,
details: Optional[Dict[str, Any]] = None
) -> Event:
def log_event(event_type: str, details: Optional[Dict[str, Any]] = None, user_id: Optional[int] = None) -> Event:
"""
Log an event in the system.
Log an event to the database.
Args:
event_type: The type of event from EventType enum
user_id: The ID of the user performing the action
details: Optional dictionary containing additional event-specific data
event_type: The type of event (must match EventType enum)
details: Optional dictionary containing event details
user_id: Optional user ID (defaults to current user)
Returns:
The created Event object
"""
if user_id is None and current_user.is_authenticated:
user_id = current_user.id
event = Event(
event_type=event_type.value,
event_type=event_type,
user_id=user_id,
timestamp=datetime.utcnow(),
details=details or {},
ip_address=request.remote_addr if request else None,
user_agent=request.user_agent.string if request and request.user_agent else None
@@ -31,93 +33,27 @@ def log_event(
db.session.commit()
return event
def get_user_events(
user_id: int,
event_type: Optional[EventType] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
limit: int = 100
) -> List[Event]:
"""
Retrieve events for a specific user with optional filtering.
Args:
user_id: The ID of the user to get events for
event_type: Optional event type to filter by
start_date: Optional start date to filter events
end_date: Optional end date to filter events
limit: Maximum number of events to return
Returns:
List of Event objects matching the criteria
"""
query = Event.query.filter_by(user_id=user_id)
if event_type:
query = query.filter_by(event_type=event_type.value)
if start_date:
query = query.filter(Event.timestamp >= start_date)
if end_date:
query = query.filter(Event.timestamp <= end_date)
return query.order_by(Event.timestamp.desc()).limit(limit).all()
def get_user_events(user_id: int, limit: int = 50) -> List[Event]:
"""Get recent events for a specific user"""
return Event.query.filter_by(user_id=user_id).order_by(desc(Event.timestamp)).limit(limit).all()
def get_room_events(
room_id: int,
event_type: Optional[EventType] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
limit: int = 100
) -> List[Event]:
"""
Retrieve events related to a specific room with optional filtering.
Args:
room_id: The ID of the room to get events for
event_type: Optional event type to filter by
start_date: Optional start date to filter events
end_date: Optional end date to filter events
limit: Maximum number of events to return
Returns:
List of Event objects matching the criteria
"""
query = Event.query.filter(Event.details['room_id'].astext.cast(Integer) == room_id)
if event_type:
query = query.filter_by(event_type=event_type.value)
if start_date:
query = query.filter(Event.timestamp >= start_date)
if end_date:
query = query.filter(Event.timestamp <= end_date)
return query.order_by(Event.timestamp.desc()).limit(limit).all()
def get_room_events(room_id: int, limit: int = 50) -> List[Event]:
"""Get recent events for a specific room"""
return Event.query.filter(
Event.details['room_id'].astext == str(room_id)
).order_by(desc(Event.timestamp)).limit(limit).all()
def get_recent_events(
event_type: Optional[EventType] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
limit: int = 100
) -> List[Event]:
"""
Retrieve recent events across the system with optional filtering.
Args:
event_type: Optional event type to filter by
start_date: Optional start date to filter events
end_date: Optional end date to filter events
limit: Maximum number of events to return
Returns:
List of Event objects matching the criteria
"""
query = Event.query
if event_type:
query = query.filter_by(event_type=event_type.value)
if start_date:
query = query.filter(Event.timestamp >= start_date)
if end_date:
query = query.filter(Event.timestamp <= end_date)
return query.order_by(Event.timestamp.desc()).limit(limit).all()
def get_recent_events(limit: int = 50) -> List[Event]:
"""Get most recent events across all types"""
return Event.query.order_by(desc(Event.timestamp)).limit(limit).all()
def get_events_by_type(event_type: str, limit: int = 50) -> List[Event]:
"""Get recent events of a specific type"""
return Event.query.filter_by(event_type=event_type).order_by(desc(Event.timestamp)).limit(limit).all()
def get_events_by_date_range(start_date: datetime, end_date: datetime, limit: int = 50) -> List[Event]:
"""Get events within a date range"""
return Event.query.filter(
Event.timestamp >= start_date,
Event.timestamp <= end_date
).order_by(desc(Event.timestamp)).limit(limit).all()

59
utils/path_utils.py Normal file
View File

@@ -0,0 +1,59 @@
import os
from typing import Optional
from werkzeug.utils import secure_filename
def clean_path(path: str, base_dir: Optional[str] = None) -> str:
"""
Clean and secure a file path, ensuring it's safe and within allowed directories.
Args:
path: The path to clean
base_dir: Optional base directory to ensure path stays within
Returns:
str: Cleaned and secured path
"""
# Remove any leading/trailing slashes and normalize path separators
path = path.strip('/\\')
path = os.path.normpath(path)
# If base_dir is provided, ensure path stays within it
if base_dir:
base_dir = os.path.normpath(base_dir)
# Get absolute paths
abs_path = os.path.abspath(path)
abs_base = os.path.abspath(base_dir)
# Check if path is within base directory
if not abs_path.startswith(abs_base):
raise ValueError("Path is outside of allowed directory")
# Return path relative to base directory
return os.path.relpath(abs_path, abs_base)
# If no base_dir, just return the cleaned path
return path
def secure_file_path(filename: str) -> str:
"""
Secure a filename using Werkzeug's secure_filename and add a timestamp.
Args:
filename: The original filename
Returns:
str: Secured filename with timestamp
"""
from datetime import datetime
# Get file extension
_, ext = os.path.splitext(filename)
# Secure the base filename
secure_name = secure_filename(filename)
# Add timestamp to prevent filename collisions
timestamp = datetime.utcnow().strftime('%Y%m%d_%H%M%S')
# Combine parts
return f"{os.path.splitext(secure_name)[0]}_{timestamp}{ext}"

83
utils/permissions.py Normal file
View File

@@ -0,0 +1,83 @@
from models import RoomMemberPermission, Room
from flask_login import current_user
from typing import Optional
def user_has_permission(room_id: int, permission_type: str, user_id: Optional[int] = None) -> bool:
"""
Check if a user has a specific permission in a room.
Args:
room_id: The ID of the room to check permissions for
permission_type: The type of permission to check (e.g., 'can_upload', 'can_download')
user_id: Optional user ID (defaults to current user)
Returns:
bool: True if the user has the permission, False otherwise
"""
if user_id is None:
if not current_user.is_authenticated:
return False
user_id = current_user.id
# Admins have all permissions
if current_user.is_authenticated and current_user.is_admin:
return True
# Check room membership and permissions
permission = RoomMemberPermission.query.filter_by(
room_id=room_id,
user_id=user_id
).first()
if not permission:
return False
# Check if the specific permission is granted
return getattr(permission, permission_type, False)
def get_user_permissions(room_id: int, user_id: Optional[int] = None) -> dict:
"""
Get all permissions for a user in a room.
Args:
room_id: The ID of the room to get permissions for
user_id: Optional user ID (defaults to current user)
Returns:
dict: Dictionary containing all permissions for the user
"""
if user_id is None:
if not current_user.is_authenticated:
return {}
user_id = current_user.id
# Admins have all permissions
if current_user.is_authenticated and current_user.is_admin:
return {
'can_upload': True,
'can_download': True,
'can_delete': True,
'can_rename': True,
'can_move': True,
'can_share': True,
'can_manage_members': True
}
# Get user's permissions
permission = RoomMemberPermission.query.filter_by(
room_id=room_id,
user_id=user_id
).first()
if not permission:
return {}
return {
'can_upload': permission.can_upload,
'can_download': permission.can_download,
'can_delete': permission.can_delete,
'can_rename': permission.can_rename,
'can_move': permission.can_move,
'can_share': permission.can_share,
'can_manage_members': permission.can_manage_members
}

109
utils/time_utils.py Normal file
View File

@@ -0,0 +1,109 @@
from datetime import datetime, timedelta
from typing import Union, Optional
def timeago(dt: Union[datetime, str], now: Optional[datetime] = None) -> str:
"""
Convert a datetime to a human-readable relative time string.
Args:
dt: The datetime to convert (can be datetime object or ISO format string)
now: Optional reference time (defaults to current time)
Returns:
str: Human-readable relative time string (e.g., "2 hours ago", "3 days ago")
"""
if isinstance(dt, str):
dt = datetime.fromisoformat(dt.replace('Z', '+00:00'))
if now is None:
now = datetime.utcnow()
diff = now - dt
# Less than a minute
if diff < timedelta(minutes=1):
return "just now"
# Less than an hour
if diff < timedelta(hours=1):
minutes = int(diff.total_seconds() / 60)
return f"{minutes} minute{'s' if minutes != 1 else ''} ago"
# Less than a day
if diff < timedelta(days=1):
hours = int(diff.total_seconds() / 3600)
return f"{hours} hour{'s' if hours != 1 else ''} ago"
# Less than a week
if diff < timedelta(days=7):
days = diff.days
return f"{days} day{'s' if days != 1 else ''} ago"
# Less than a month
if diff < timedelta(days=30):
weeks = int(diff.days / 7)
return f"{weeks} week{'s' if weeks != 1 else ''} ago"
# Less than a year
if diff < timedelta(days=365):
months = int(diff.days / 30)
return f"{months} month{'s' if months != 1 else ''} ago"
# More than a year
years = int(diff.days / 365)
return f"{years} year{'s' if years != 1 else ''} ago"
def format_datetime(dt: Union[datetime, str], format: str = "%Y-%m-%d %H:%M:%S") -> str:
"""
Format a datetime object or ISO string to a specified format.
Args:
dt: The datetime to format (can be datetime object or ISO format string)
format: The format string to use (defaults to "YYYY-MM-DD HH:MM:SS")
Returns:
str: Formatted datetime string
"""
if isinstance(dt, str):
dt = datetime.fromisoformat(dt.replace('Z', '+00:00'))
return dt.strftime(format)
def parse_datetime(dt_str: str) -> datetime:
"""
Parse a datetime string in various formats to a datetime object.
Args:
dt_str: The datetime string to parse
Returns:
datetime: Parsed datetime object
Raises:
ValueError: If the string cannot be parsed
"""
# Try ISO format first
try:
return datetime.fromisoformat(dt_str.replace('Z', '+00:00'))
except ValueError:
pass
# Try common formats
formats = [
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%d %H:%M",
"%Y-%m-%d",
"%d/%m/%Y %H:%M:%S",
"%d/%m/%Y %H:%M",
"%d/%m/%Y",
"%m/%d/%Y %H:%M:%S",
"%m/%d/%Y %H:%M",
"%m/%d/%Y"
]
for fmt in formats:
try:
return datetime.strptime(dt_str, fmt)
except ValueError:
continue
raise ValueError(f"Could not parse datetime string: {dt_str}")