Files
docupulse/models.py
2025-06-05 11:52:39 +02:00

447 lines
20 KiB
Python

from flask_sqlalchemy import SQLAlchemy
from flask_login import UserMixin
from werkzeug.security import generate_password_hash, check_password_hash
from datetime import datetime
from sqlalchemy.orm import relationship
from extensions import db
from enum import Enum
import json
# Association table for room members
room_members = db.Table('room_members',
db.Column('room_id', db.Integer, db.ForeignKey('room.id'), primary_key=True),
db.Column('user_id', db.Integer, db.ForeignKey('user.id'), primary_key=True)
)
# Association table for conversation members
conversation_members = db.Table('conversation_members',
db.Column('conversation_id', db.Integer, db.ForeignKey('conversation.id'), primary_key=True),
db.Column('user_id', db.Integer, db.ForeignKey('user.id'), primary_key=True)
)
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(150), unique=True, nullable=False)
last_name = db.Column(db.String(150), nullable=False, default='--')
email = db.Column(db.String(150), unique=True, nullable=False)
password_hash = db.Column(db.String(256))
is_admin = db.Column(db.Boolean, default=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
phone = db.Column(db.String(20))
company = db.Column(db.String(100))
position = db.Column(db.String(100))
notes = db.Column(db.Text)
is_active = db.Column(db.Boolean, default=True)
profile_picture = db.Column(db.String(255))
preferred_view = db.Column(db.String(10), default='grid', nullable=False) # 'grid' or 'list'
room_permissions = relationship(
'RoomMemberPermission',
back_populates='user',
cascade='all, delete-orphan'
)
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
def __repr__(self):
return f'<User {self.username}>'
class Room(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
description = db.Column(db.Text)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
created_by = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
# Relationships
creator = db.relationship('User', backref=db.backref('created_rooms', cascade='all, delete-orphan'), foreign_keys=[created_by])
members = db.relationship('User', secondary=room_members, backref=db.backref('rooms', lazy='dynamic'))
member_permissions = relationship('RoomMemberPermission', back_populates='room', cascade='all, delete-orphan')
files = db.relationship('RoomFile', back_populates='room', cascade='all, delete-orphan')
def __repr__(self):
return f'<Room {self.name}>'
# Association table for room members with permissions
class RoomMemberPermission(db.Model):
__tablename__ = 'room_member_permissions'
room_id = db.Column(db.Integer, db.ForeignKey('room.id'), primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), primary_key=True)
can_view = db.Column(db.Boolean, default=True, nullable=False)
can_download = db.Column(db.Boolean, default=False, nullable=False)
can_upload = db.Column(db.Boolean, default=False, nullable=False)
can_delete = db.Column(db.Boolean, default=False, nullable=False)
can_rename = db.Column(db.Boolean, default=False, nullable=False)
can_move = db.Column(db.Boolean, default=False, nullable=False)
can_share = db.Column(db.Boolean, default=False, nullable=False)
# Relationships
user = relationship('User', back_populates='room_permissions')
room = relationship('Room', back_populates='member_permissions')
class RoomFile(db.Model):
__tablename__ = 'room_file'
id = db.Column(db.Integer, primary_key=True)
room_id = db.Column(db.Integer, db.ForeignKey('room.id'), nullable=False)
name = db.Column(db.String(255), nullable=False)
path = db.Column(db.String(255), nullable=False, default='')
type = db.Column(db.String(10), nullable=False) # 'file' or 'folder'
size = db.Column(db.Integer) # in bytes, null for folders
modified = db.Column(db.Float) # timestamp
uploaded_by = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'))
uploaded_at = db.Column(db.DateTime, default=datetime.utcnow)
deleted = db.Column(db.Boolean, default=False) # New field for deleted status
deleted_by = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'))
deleted_at = db.Column(db.DateTime) # New field for tracking when the file was deleted
uploader = db.relationship('User', backref=db.backref('uploaded_files', cascade='all, delete-orphan'), foreign_keys=[uploaded_by])
deleter = db.relationship('User', backref=db.backref('deleted_room_files', cascade='all, delete-orphan'), foreign_keys=[deleted_by])
room = db.relationship('Room', back_populates='files')
starred_by = db.relationship('User', secondary='user_starred_file', backref='starred_files')
def __repr__(self):
return f'<RoomFile {self.name} ({self.type}) in {self.path}>'
class UserStarredFile(db.Model):
__tablename__ = 'user_starred_file'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
file_id = db.Column(db.Integer, db.ForeignKey('room_file.id'), nullable=False)
starred_at = db.Column(db.DateTime, default=datetime.utcnow)
# Add unique constraint to prevent duplicate stars
__table_args__ = (
db.UniqueConstraint('user_id', 'file_id', name='unique_user_file_star'),
)
def __repr__(self):
return f'<UserStarredFile user_id={self.user_id} file_id={self.file_id}>'
class TrashedFile(db.Model):
__tablename__ = 'trashed_file'
id = db.Column(db.Integer, primary_key=True)
room_id = db.Column(db.Integer, db.ForeignKey('room.id'), nullable=False)
name = db.Column(db.String(255), nullable=False)
original_path = db.Column(db.String(255), nullable=False, default='')
type = db.Column(db.String(10), nullable=False) # 'file' or 'folder'
size = db.Column(db.Integer) # in bytes, null for folders
modified = db.Column(db.Float) # timestamp
uploaded_by = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'))
uploaded_at = db.Column(db.DateTime, default=datetime.utcnow)
deleted_by = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
deleted_at = db.Column(db.DateTime, default=datetime.utcnow)
room = db.relationship('Room', backref='trashed_files')
uploader = db.relationship('User', foreign_keys=[uploaded_by], backref=db.backref('uploaded_trashed_files', cascade='all, delete-orphan'))
deleter = db.relationship('User', foreign_keys=[deleted_by], backref=db.backref('deleted_trashed_files', cascade='all, delete-orphan'))
def __repr__(self):
return f'<TrashedFile {self.name} ({self.type}) from {self.original_path}>'
class SiteSettings(db.Model):
id = db.Column(db.Integer, primary_key=True)
primary_color = db.Column(db.String(7), default='#16767b') # Default from colors.css
secondary_color = db.Column(db.String(7), default='#741b5f') # Default from colors.css
company_name = db.Column(db.String(100))
company_logo = db.Column(db.String(255)) # Store the filename of the logo
company_website = db.Column(db.String(200))
company_email = db.Column(db.String(100))
company_phone = db.Column(db.String(20))
company_address = db.Column(db.String(200))
company_city = db.Column(db.String(100))
company_state = db.Column(db.String(100))
company_zip = db.Column(db.String(20))
company_country = db.Column(db.String(100))
company_description = db.Column(db.Text)
company_industry = db.Column(db.String(100))
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
@classmethod
def get_settings(cls):
settings = cls.query.first()
if not settings:
settings = cls()
db.session.add(settings)
db.session.commit()
return settings
class DocuPulseSettings(db.Model):
__tablename__ = 'docupulse_settings'
id = db.Column(db.Integer, primary_key=True)
max_rooms = db.Column(db.Integer, default=10)
max_conversations = db.Column(db.Integer, default=10)
max_storage = db.Column(db.BigInteger, default=10737418240) # 10GB in bytes
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
@classmethod
def get_settings(cls):
try:
settings = cls.query.first()
if not settings:
settings = cls(
max_rooms=10,
max_conversations=10,
max_storage=10737418240 # 10GB in bytes
)
db.session.add(settings)
db.session.commit()
return settings
except Exception as e:
# If there's an error (like integer overflow), rollback and return None
db.session.rollback()
return None
@classmethod
def get_usage_stats(cls):
settings = cls.get_settings()
if not settings:
# Return default values if settings can't be retrieved
return {
'max_rooms': 10,
'max_conversations': 10,
'max_storage': 10737418240,
'current_rooms': 0,
'current_conversations': 0,
'current_storage': 0,
'rooms_percentage': 0,
'conversations_percentage': 0,
'storage_percentage': 0
}
total_rooms = Room.query.count()
total_conversations = Conversation.query.count()
total_storage = db.session.query(db.func.sum(RoomFile.size)).filter(RoomFile.deleted == False).scalar() or 0
return {
'max_rooms': settings.max_rooms,
'max_conversations': settings.max_conversations,
'max_storage': settings.max_storage,
'current_rooms': total_rooms,
'current_conversations': total_conversations,
'current_storage': total_storage,
'rooms_percentage': (total_rooms / settings.max_rooms) * 100 if settings.max_rooms > 0 else 0,
'conversations_percentage': (total_conversations / settings.max_conversations) * 100 if settings.max_conversations > 0 else 0,
'storage_percentage': (total_storage / settings.max_storage) * 100 if settings.max_storage > 0 else 0
}
class KeyValueSettings(db.Model):
id = db.Column(db.Integer, primary_key=True)
key = db.Column(db.String(100), unique=True, nullable=False)
value = db.Column(db.Text)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
@classmethod
def get_value(cls, key, default=None):
setting = cls.query.filter_by(key=key).first()
if setting:
try:
return json.loads(setting.value)
except (json.JSONDecodeError, TypeError):
return setting.value
return default
@classmethod
def set_value(cls, key, value):
setting = cls.query.filter_by(key=key).first()
if not setting:
setting = cls(key=key)
if isinstance(value, (dict, list)):
setting.value = json.dumps(value)
else:
setting.value = str(value)
db.session.add(setting)
db.session.commit()
return setting
class Conversation(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
description = db.Column(db.Text)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
created_by = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
# Relationships
creator = db.relationship('User', backref=db.backref('created_conversations', cascade='all, delete-orphan'), foreign_keys=[created_by])
members = db.relationship('User', secondary=conversation_members, backref=db.backref('conversations', lazy='dynamic'))
messages = db.relationship('Message', back_populates='conversation', cascade='all, delete-orphan')
def __repr__(self):
return f'<Conversation {self.name}>'
class Message(db.Model):
id = db.Column(db.Integer, primary_key=True)
content = db.Column(db.Text, nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
conversation_id = db.Column(db.Integer, db.ForeignKey('conversation.id'), nullable=False)
user_id = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
# Relationships
conversation = db.relationship('Conversation', back_populates='messages')
user = db.relationship('User', backref=db.backref('messages', cascade='all, delete-orphan'))
attachments = db.relationship('MessageAttachment', back_populates='message', cascade='all, delete-orphan')
def __repr__(self):
return f'<Message {self.id}>'
class MessageAttachment(db.Model):
id = db.Column(db.Integer, primary_key=True)
message_id = db.Column(db.Integer, db.ForeignKey('message.id'), nullable=False)
name = db.Column(db.String(255), nullable=False)
path = db.Column(db.String(512), nullable=False)
type = db.Column(db.String(100))
size = db.Column(db.Integer) # Size in bytes
created_at = db.Column(db.DateTime, default=datetime.utcnow)
# Relationships
message = db.relationship('Message', back_populates='attachments')
def __repr__(self):
return f'<MessageAttachment {self.name}>'
class EventType(Enum):
# User events
USER_LOGIN = 'user_login'
USER_LOGOUT = 'user_logout'
USER_CREATE = 'user_create'
USER_UPDATE = 'user_update'
USER_DELETE = 'user_delete'
# Room events
ROOM_CREATE = 'room_create'
ROOM_UPDATE = 'room_update'
ROOM_DELETE = 'room_delete'
ROOM_MEMBER_ADD = 'room_member_add'
ROOM_MEMBER_REMOVE = 'room_member_remove'
ROOM_PERMISSION_UPDATE = 'room_permission_update'
# File events
FILE_UPLOAD = 'file_upload'
FILE_DOWNLOAD = 'file_download'
FILE_DELETE = 'file_delete'
FILE_RENAME = 'file_rename'
FILE_MOVE = 'file_move'
FILE_STAR = 'file_star'
FILE_UNSTAR = 'file_unstar'
# Conversation events
CONVERSATION_CREATE = 'conversation_create'
CONVERSATION_UPDATE = 'conversation_update'
CONVERSATION_DELETE = 'conversation_delete'
CONVERSATION_MEMBER_ADD = 'conversation_member_add'
CONVERSATION_MEMBER_REMOVE = 'conversation_member_remove'
CONVERSATION_OPEN = 'conversation_open'
# Message events
MESSAGE_CREATE = 'message_create'
MESSAGE_UPDATE = 'message_update'
MESSAGE_DELETE = 'message_delete'
MESSAGE_ATTACHMENT_ADD = 'message_attachment_add'
MESSAGE_ATTACHMENT_REMOVE = 'message_attachment_remove'
# Settings events
SETTINGS_UPDATE = 'settings_update'
class Event(db.Model):
__tablename__ = 'events'
id = db.Column(db.Integer, primary_key=True)
event_type = db.Column(db.String(50), nullable=False)
user_id = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=True)
timestamp = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
details = db.Column(db.JSON) # Store additional event-specific data
ip_address = db.Column(db.String(45)) # IPv6 addresses can be up to 45 chars
user_agent = db.Column(db.String(255))
# Relationships
user = db.relationship('User', backref=db.backref('events', cascade='all, delete-orphan'))
def __repr__(self):
return f'<Event {self.event_type} by User {self.user_id} at {self.timestamp}>'
class NotifType(Enum):
# User notifications
ACCOUNT_CREATED = 'account_created'
PASSWORD_RESET = 'password_reset'
ACCOUNT_DELETED = 'account_deleted'
ACCOUNT_UPDATED = 'account_updated'
# Room notifications
ROOM_INVITE = 'room_invite'
ROOM_INVITE_REMOVED = 'room_invite_removed'
# Conversation notifications
CONVERSATION_INVITE = 'conversation_invite'
CONVERSATION_INVITE_REMOVED = 'conversation_invite_removed'
CONVERSATION_MESSAGE = 'conversation_message'
class Notif(db.Model):
__tablename__ = 'notifs'
id = db.Column(db.Integer, primary_key=True)
notif_type = db.Column(db.String(50), nullable=False)
user_id = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
sender_id = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=True)
timestamp = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
read = db.Column(db.Boolean, default=False, nullable=False)
details = db.Column(db.JSON) # Store additional notification-specific data
# Relationships
user = db.relationship('User', foreign_keys=[user_id], backref=db.backref('notifications', cascade='all, delete-orphan'))
sender = db.relationship('User', foreign_keys=[sender_id], backref='sent_notifications')
def __repr__(self):
return f'<Notif {self.notif_type} for User {self.user_id} at {self.timestamp}>'
class EmailTemplate(db.Model):
__tablename__ = 'email_templates'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
subject = db.Column(db.String(200), nullable=False)
body = db.Column(db.Text, nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
created_by = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
is_active = db.Column(db.Boolean, default=True)
# Relationships
creator = db.relationship('User', backref=db.backref('created_email_templates', cascade='all, delete-orphan'), foreign_keys=[created_by])
def __repr__(self):
return f'<EmailTemplate {self.name}>'
class Mail(db.Model):
__tablename__ = 'mails'
id = db.Column(db.Integer, primary_key=True)
recipient = db.Column(db.String(150), nullable=False)
subject = db.Column(db.String(200), nullable=False)
body = db.Column(db.Text, nullable=False)
status = db.Column(db.String(20), default='pending', nullable=False) # e.g., pending, sent, failed
created_at = db.Column(db.DateTime, default=datetime.utcnow)
sent_at = db.Column(db.DateTime, nullable=True)
template_id = db.Column(db.Integer, db.ForeignKey('email_templates.id'), nullable=True)
notif_id = db.Column(db.Integer, db.ForeignKey('notifs.id'), nullable=True)
# Relationships
template = db.relationship('EmailTemplate', backref='mails')
notif = db.relationship('Notif', backref='mails')
def __repr__(self):
return f'<Mail to {self.recipient} status={self.status}>'
class PasswordSetupToken(db.Model):
__tablename__ = 'password_setup_tokens'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
token = db.Column(db.String(100), unique=True, nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
expires_at = db.Column(db.DateTime, nullable=False)
used = db.Column(db.Boolean, default=False)
# Relationships
user = db.relationship('User', backref=db.backref('password_setup_tokens', cascade='all, delete-orphan'))
def is_valid(self):
return not self.used and datetime.utcnow() < self.expires_at
def __repr__(self):
return f'<PasswordSetupToken {self.token}>'