722 lines
33 KiB
Python
722 lines
33 KiB
Python
from flask_sqlalchemy import SQLAlchemy
|
|
from flask_login import UserMixin
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
from datetime import datetime
|
|
from sqlalchemy.orm import relationship
|
|
from extensions import db
|
|
from enum import Enum
|
|
import json
|
|
|
|
# Association table for room members
|
|
room_members = db.Table('room_members',
|
|
db.Column('room_id', db.Integer, db.ForeignKey('room.id'), primary_key=True),
|
|
db.Column('user_id', db.Integer, db.ForeignKey('user.id'), primary_key=True)
|
|
)
|
|
|
|
# Association table for conversation members
|
|
conversation_members = db.Table('conversation_members',
|
|
db.Column('conversation_id', db.Integer, db.ForeignKey('conversation.id'), primary_key=True),
|
|
db.Column('user_id', db.Integer, db.ForeignKey('user.id'), primary_key=True)
|
|
)
|
|
|
|
class User(UserMixin, db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
username = db.Column(db.String(150), unique=True, nullable=False)
|
|
last_name = db.Column(db.String(150), nullable=False, default='--')
|
|
email = db.Column(db.String(150), unique=True, nullable=False)
|
|
password_hash = db.Column(db.String(256))
|
|
is_admin = db.Column(db.Boolean, default=False)
|
|
is_manager = db.Column(db.Boolean, default=False) # New field for manager role
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
phone = db.Column(db.String(20))
|
|
company = db.Column(db.String(100))
|
|
position = db.Column(db.String(100))
|
|
notes = db.Column(db.Text)
|
|
is_active = db.Column(db.Boolean, default=True)
|
|
profile_picture = db.Column(db.String(255))
|
|
preferred_view = db.Column(db.String(10), default='grid', nullable=False) # 'grid' or 'list'
|
|
room_permissions = relationship(
|
|
'RoomMemberPermission',
|
|
back_populates='user',
|
|
cascade='all, delete-orphan'
|
|
)
|
|
|
|
def set_password(self, password):
|
|
self.password_hash = generate_password_hash(password)
|
|
|
|
def check_password(self, password):
|
|
return check_password_hash(self.password_hash, password)
|
|
|
|
def __repr__(self):
|
|
return f'<User {self.username}>'
|
|
|
|
class Room(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
name = db.Column(db.String(100), nullable=False)
|
|
description = db.Column(db.Text)
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
created_by = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
|
|
|
|
# Relationships
|
|
creator = db.relationship('User', backref=db.backref('created_rooms', cascade='all, delete-orphan'), foreign_keys=[created_by])
|
|
members = db.relationship('User', secondary=room_members, backref=db.backref('rooms', lazy='dynamic'))
|
|
member_permissions = relationship('RoomMemberPermission', back_populates='room', cascade='all, delete-orphan')
|
|
files = db.relationship('RoomFile', back_populates='room', cascade='all, delete-orphan')
|
|
|
|
def __repr__(self):
|
|
return f'<Room {self.name}>'
|
|
|
|
# Association table for room members with permissions
|
|
class RoomMemberPermission(db.Model):
|
|
__tablename__ = 'room_member_permissions'
|
|
room_id = db.Column(db.Integer, db.ForeignKey('room.id'), primary_key=True)
|
|
user_id = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), primary_key=True)
|
|
can_view = db.Column(db.Boolean, default=True, nullable=False)
|
|
can_download = db.Column(db.Boolean, default=False, nullable=False)
|
|
can_upload = db.Column(db.Boolean, default=False, nullable=False)
|
|
can_delete = db.Column(db.Boolean, default=False, nullable=False)
|
|
can_rename = db.Column(db.Boolean, default=False, nullable=False)
|
|
can_move = db.Column(db.Boolean, default=False, nullable=False)
|
|
can_share = db.Column(db.Boolean, default=False, nullable=False)
|
|
# Relationships
|
|
user = relationship('User', back_populates='room_permissions')
|
|
room = relationship('Room', back_populates='member_permissions')
|
|
|
|
class RoomFile(db.Model):
|
|
__tablename__ = 'room_file'
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
room_id = db.Column(db.Integer, db.ForeignKey('room.id'), nullable=False)
|
|
name = db.Column(db.String(255), nullable=False)
|
|
path = db.Column(db.String(255), nullable=False, default='')
|
|
type = db.Column(db.String(10), nullable=False) # 'file' or 'folder'
|
|
size = db.Column(db.Integer) # in bytes, null for folders
|
|
modified = db.Column(db.Float) # timestamp
|
|
uploaded_by = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'))
|
|
uploaded_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
deleted = db.Column(db.Boolean, default=False) # New field for deleted status
|
|
deleted_by = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'))
|
|
deleted_at = db.Column(db.DateTime) # New field for tracking when the file was deleted
|
|
uploader = db.relationship('User', backref=db.backref('uploaded_files', cascade='all, delete-orphan'), foreign_keys=[uploaded_by])
|
|
deleter = db.relationship('User', backref=db.backref('deleted_room_files', cascade='all, delete-orphan'), foreign_keys=[deleted_by])
|
|
room = db.relationship('Room', back_populates='files')
|
|
starred_by = db.relationship('User', secondary='user_starred_file', backref='starred_files')
|
|
|
|
def __repr__(self):
|
|
return f'<RoomFile {self.name} ({self.type}) in {self.path}>'
|
|
|
|
class UserStarredFile(db.Model):
|
|
__tablename__ = 'user_starred_file'
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
user_id = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
|
|
file_id = db.Column(db.Integer, db.ForeignKey('room_file.id'), nullable=False)
|
|
starred_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
|
|
# Add unique constraint to prevent duplicate stars
|
|
__table_args__ = (
|
|
db.UniqueConstraint('user_id', 'file_id', name='unique_user_file_star'),
|
|
)
|
|
|
|
def __repr__(self):
|
|
return f'<UserStarredFile user_id={self.user_id} file_id={self.file_id}>'
|
|
|
|
class TrashedFile(db.Model):
|
|
__tablename__ = 'trashed_file'
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
room_id = db.Column(db.Integer, db.ForeignKey('room.id'), nullable=False)
|
|
name = db.Column(db.String(255), nullable=False)
|
|
original_path = db.Column(db.String(255), nullable=False, default='')
|
|
type = db.Column(db.String(10), nullable=False) # 'file' or 'folder'
|
|
size = db.Column(db.Integer) # in bytes, null for folders
|
|
modified = db.Column(db.Float) # timestamp
|
|
uploaded_by = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'))
|
|
uploaded_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
deleted_by = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
|
|
deleted_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
room = db.relationship('Room', backref='trashed_files')
|
|
uploader = db.relationship('User', foreign_keys=[uploaded_by], backref=db.backref('uploaded_trashed_files', cascade='all, delete-orphan'))
|
|
deleter = db.relationship('User', foreign_keys=[deleted_by], backref=db.backref('deleted_trashed_files', cascade='all, delete-orphan'))
|
|
|
|
def __repr__(self):
|
|
return f'<TrashedFile {self.name} ({self.type}) from {self.original_path}>'
|
|
|
|
class SiteSettings(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
primary_color = db.Column(db.String(7), default='#16767b') # Default from colors.css
|
|
secondary_color = db.Column(db.String(7), default='#741b5f') # Default from colors.css
|
|
company_name = db.Column(db.String(100))
|
|
company_logo = db.Column(db.String(255)) # Store the filename of the logo
|
|
company_website = db.Column(db.String(200))
|
|
company_email = db.Column(db.String(100))
|
|
company_phone = db.Column(db.String(20))
|
|
company_address = db.Column(db.String(200))
|
|
company_city = db.Column(db.String(100))
|
|
company_state = db.Column(db.String(100))
|
|
company_zip = db.Column(db.String(20))
|
|
company_country = db.Column(db.String(100))
|
|
company_description = db.Column(db.Text)
|
|
company_industry = db.Column(db.String(100))
|
|
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
|
|
|
|
@classmethod
|
|
def get_settings(cls):
|
|
settings = cls.query.first()
|
|
if not settings:
|
|
settings = cls()
|
|
db.session.add(settings)
|
|
db.session.commit()
|
|
return settings
|
|
|
|
class DocuPulseSettings(db.Model):
|
|
__tablename__ = 'docupulse_settings'
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
max_rooms = db.Column(db.Integer, default=10)
|
|
max_conversations = db.Column(db.Integer, default=10)
|
|
max_storage = db.Column(db.BigInteger, default=10737418240) # 10GB in bytes
|
|
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
|
|
|
|
@classmethod
|
|
def get_settings(cls):
|
|
try:
|
|
settings = cls.query.first()
|
|
if not settings:
|
|
settings = cls(
|
|
max_rooms=10,
|
|
max_conversations=10,
|
|
max_storage=10737418240 # 10GB in bytes
|
|
)
|
|
db.session.add(settings)
|
|
db.session.commit()
|
|
return settings
|
|
except Exception as e:
|
|
# If there's an error (like integer overflow), rollback and return None
|
|
db.session.rollback()
|
|
return None
|
|
|
|
@classmethod
|
|
def get_usage_stats(cls):
|
|
settings = cls.get_settings()
|
|
if not settings:
|
|
# Return default values if settings can't be retrieved
|
|
return {
|
|
'max_rooms': 10,
|
|
'max_conversations': 10,
|
|
'max_storage': 10737418240,
|
|
'current_rooms': 0,
|
|
'current_conversations': 0,
|
|
'current_storage': 0,
|
|
'rooms_percentage': 0,
|
|
'conversations_percentage': 0,
|
|
'storage_percentage': 0
|
|
}
|
|
|
|
total_rooms = Room.query.count()
|
|
total_conversations = Conversation.query.count()
|
|
total_storage = db.session.query(db.func.sum(RoomFile.size)).filter(RoomFile.deleted == False).scalar() or 0
|
|
|
|
return {
|
|
'max_rooms': settings.max_rooms,
|
|
'max_conversations': settings.max_conversations,
|
|
'max_storage': settings.max_storage,
|
|
'current_rooms': total_rooms,
|
|
'current_conversations': total_conversations,
|
|
'current_storage': total_storage,
|
|
'rooms_percentage': (total_rooms / settings.max_rooms) * 100 if settings.max_rooms > 0 else 0,
|
|
'conversations_percentage': (total_conversations / settings.max_conversations) * 100 if settings.max_conversations > 0 else 0,
|
|
'storage_percentage': (total_storage / settings.max_storage) * 100 if settings.max_storage > 0 else 0
|
|
}
|
|
|
|
class KeyValueSettings(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
key = db.Column(db.String(100), unique=True, nullable=False)
|
|
value = db.Column(db.Text)
|
|
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
|
|
|
|
@classmethod
|
|
def get_value(cls, key, default=None):
|
|
setting = cls.query.filter_by(key=key).first()
|
|
if setting:
|
|
try:
|
|
return json.loads(setting.value)
|
|
except (json.JSONDecodeError, TypeError):
|
|
return setting.value
|
|
return default
|
|
|
|
@classmethod
|
|
def set_value(cls, key, value):
|
|
setting = cls.query.filter_by(key=key).first()
|
|
if not setting:
|
|
setting = cls(key=key)
|
|
|
|
if isinstance(value, (dict, list)):
|
|
setting.value = json.dumps(value)
|
|
else:
|
|
setting.value = str(value)
|
|
|
|
db.session.add(setting)
|
|
db.session.commit()
|
|
return setting
|
|
|
|
class Conversation(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
name = db.Column(db.String(100), nullable=False)
|
|
description = db.Column(db.Text)
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
created_by = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
|
|
|
|
# Relationships
|
|
creator = db.relationship('User', backref=db.backref('created_conversations', cascade='all, delete-orphan'), foreign_keys=[created_by])
|
|
members = db.relationship('User', secondary=conversation_members, backref=db.backref('conversations', lazy='dynamic'))
|
|
messages = db.relationship('Message', back_populates='conversation', cascade='all, delete-orphan')
|
|
|
|
def __repr__(self):
|
|
return f'<Conversation {self.name}>'
|
|
|
|
class Message(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
content = db.Column(db.Text, nullable=False)
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
conversation_id = db.Column(db.Integer, db.ForeignKey('conversation.id'), nullable=False)
|
|
user_id = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
|
|
|
|
# Relationships
|
|
conversation = db.relationship('Conversation', back_populates='messages')
|
|
user = db.relationship('User', backref=db.backref('messages', cascade='all, delete-orphan'))
|
|
attachments = db.relationship('MessageAttachment', back_populates='message', cascade='all, delete-orphan')
|
|
|
|
def __repr__(self):
|
|
return f'<Message {self.id}>'
|
|
|
|
class MessageAttachment(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
message_id = db.Column(db.Integer, db.ForeignKey('message.id'), nullable=False)
|
|
name = db.Column(db.String(255), nullable=False)
|
|
path = db.Column(db.String(512), nullable=False)
|
|
type = db.Column(db.String(100))
|
|
size = db.Column(db.Integer) # Size in bytes
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
|
|
# Relationships
|
|
message = db.relationship('Message', back_populates='attachments')
|
|
|
|
def __repr__(self):
|
|
return f'<MessageAttachment {self.name}>'
|
|
|
|
class EventType(Enum):
|
|
# User events
|
|
USER_LOGIN = 'user_login'
|
|
USER_LOGOUT = 'user_logout'
|
|
USER_CREATE = 'user_create'
|
|
USER_UPDATE = 'user_update'
|
|
USER_DELETE = 'user_delete'
|
|
|
|
# Room events
|
|
ROOM_CREATE = 'room_create'
|
|
ROOM_UPDATE = 'room_update'
|
|
ROOM_DELETE = 'room_delete'
|
|
ROOM_MEMBER_ADD = 'room_member_add'
|
|
ROOM_MEMBER_REMOVE = 'room_member_remove'
|
|
ROOM_PERMISSION_UPDATE = 'room_permission_update'
|
|
|
|
# File events
|
|
FILE_UPLOAD = 'file_upload'
|
|
FILE_DOWNLOAD = 'file_download'
|
|
FILE_DELETE = 'file_delete'
|
|
FILE_RENAME = 'file_rename'
|
|
FILE_MOVE = 'file_move'
|
|
FILE_STAR = 'file_star'
|
|
FILE_UNSTAR = 'file_unstar'
|
|
|
|
# Conversation events
|
|
CONVERSATION_CREATE = 'conversation_create'
|
|
CONVERSATION_UPDATE = 'conversation_update'
|
|
CONVERSATION_DELETE = 'conversation_delete'
|
|
CONVERSATION_MEMBER_ADD = 'conversation_member_add'
|
|
CONVERSATION_MEMBER_REMOVE = 'conversation_member_remove'
|
|
CONVERSATION_OPEN = 'conversation_open'
|
|
|
|
# Message events
|
|
MESSAGE_CREATE = 'message_create'
|
|
MESSAGE_UPDATE = 'message_update'
|
|
MESSAGE_DELETE = 'message_delete'
|
|
MESSAGE_ATTACHMENT_ADD = 'message_attachment_add'
|
|
MESSAGE_ATTACHMENT_REMOVE = 'message_attachment_remove'
|
|
|
|
# Settings events
|
|
SETTINGS_UPDATE = 'settings_update'
|
|
|
|
class Event(db.Model):
|
|
__tablename__ = 'events'
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
event_type = db.Column(db.String(50), nullable=False)
|
|
user_id = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=True)
|
|
timestamp = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
|
|
details = db.Column(db.JSON) # Store additional event-specific data
|
|
ip_address = db.Column(db.String(45)) # IPv6 addresses can be up to 45 chars
|
|
user_agent = db.Column(db.String(255))
|
|
|
|
# Relationships
|
|
user = db.relationship('User', backref=db.backref('events', cascade='all, delete-orphan'))
|
|
|
|
def __repr__(self):
|
|
return f'<Event {self.event_type} by User {self.user_id} at {self.timestamp}>'
|
|
|
|
class NotifType(Enum):
|
|
# User notifications
|
|
ACCOUNT_CREATED = 'account_created'
|
|
PASSWORD_RESET = 'password_reset'
|
|
ACCOUNT_DELETED = 'account_deleted'
|
|
ACCOUNT_UPDATED = 'account_updated'
|
|
|
|
# Room notifications
|
|
ROOM_INVITE = 'room_invite'
|
|
ROOM_INVITE_REMOVED = 'room_invite_removed'
|
|
|
|
# Conversation notifications
|
|
CONVERSATION_INVITE = 'conversation_invite'
|
|
CONVERSATION_INVITE_REMOVED = 'conversation_invite_removed'
|
|
CONVERSATION_MESSAGE = 'conversation_message'
|
|
|
|
class Notif(db.Model):
|
|
__tablename__ = 'notifs'
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
notif_type = db.Column(db.String(50), nullable=False)
|
|
user_id = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
|
|
sender_id = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=True)
|
|
timestamp = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
|
|
read = db.Column(db.Boolean, default=False, nullable=False)
|
|
details = db.Column(db.JSON) # Store additional notification-specific data
|
|
|
|
# Relationships
|
|
user = db.relationship('User', foreign_keys=[user_id], backref=db.backref('notifications', cascade='all, delete-orphan'))
|
|
sender = db.relationship('User', foreign_keys=[sender_id], backref='sent_notifications')
|
|
|
|
def __repr__(self):
|
|
return f'<Notif {self.notif_type} for User {self.user_id} at {self.timestamp}>'
|
|
|
|
class EmailTemplate(db.Model):
|
|
__tablename__ = 'email_templates'
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
name = db.Column(db.String(100), nullable=False)
|
|
subject = db.Column(db.String(200), nullable=False)
|
|
body = db.Column(db.Text, nullable=False)
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
|
|
created_by = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
|
|
is_active = db.Column(db.Boolean, default=True)
|
|
|
|
# Relationships
|
|
creator = db.relationship('User', backref=db.backref('created_email_templates', cascade='all, delete-orphan'), foreign_keys=[created_by])
|
|
|
|
def __repr__(self):
|
|
return f'<EmailTemplate {self.name}>'
|
|
|
|
class Mail(db.Model):
|
|
__tablename__ = 'mails'
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
recipient = db.Column(db.String(150), nullable=False)
|
|
subject = db.Column(db.String(200), nullable=False)
|
|
body = db.Column(db.Text, nullable=False)
|
|
status = db.Column(db.String(20), default='pending', nullable=False) # e.g., pending, sent, failed
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
sent_at = db.Column(db.DateTime, nullable=True)
|
|
template_id = db.Column(db.Integer, db.ForeignKey('email_templates.id'), nullable=True)
|
|
notif_id = db.Column(db.Integer, db.ForeignKey('notifs.id'), nullable=True)
|
|
|
|
# Relationships
|
|
template = db.relationship('EmailTemplate', backref='mails')
|
|
notif = db.relationship('Notif', backref='mails')
|
|
|
|
def __repr__(self):
|
|
return f'<Mail to {self.recipient} status={self.status}>'
|
|
|
|
class PasswordSetupToken(db.Model):
|
|
__tablename__ = 'password_setup_tokens'
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
user_id = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
|
|
token = db.Column(db.String(100), unique=True, nullable=False)
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
expires_at = db.Column(db.DateTime, nullable=False)
|
|
used = db.Column(db.Boolean, default=False)
|
|
|
|
# Relationships
|
|
user = db.relationship('User', backref=db.backref('password_setup_tokens', cascade='all, delete-orphan'))
|
|
|
|
def is_valid(self):
|
|
return not self.used and datetime.utcnow() < self.expires_at
|
|
|
|
def __repr__(self):
|
|
return f'<PasswordSetupToken {self.token}>'
|
|
|
|
class PasswordResetToken(db.Model):
|
|
__tablename__ = 'password_reset_tokens'
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
user_id = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
|
|
token = db.Column(db.String(100), unique=True, nullable=False)
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
expires_at = db.Column(db.DateTime, nullable=False)
|
|
used = db.Column(db.Boolean, default=False)
|
|
ip_address = db.Column(db.String(45)) # Store IP address for security
|
|
|
|
# Relationships
|
|
user = db.relationship('User', backref=db.backref('password_reset_tokens', cascade='all, delete-orphan'))
|
|
|
|
def is_valid(self):
|
|
return not self.used and datetime.utcnow() < self.expires_at
|
|
|
|
def __repr__(self):
|
|
return f'<PasswordResetToken {self.token}>'
|
|
|
|
def user_has_permission(room, perm_name):
|
|
"""
|
|
Check if the current user has a specific permission in a room.
|
|
|
|
Args:
|
|
room: Room object
|
|
perm_name: Name of the permission to check (e.g., 'can_view', 'can_upload')
|
|
|
|
Returns:
|
|
bool: True if user has permission, False otherwise
|
|
"""
|
|
# Admin and manager users have all permissions
|
|
if current_user.is_admin or current_user.is_manager:
|
|
return True
|
|
|
|
# Check if user is a member of the room
|
|
if current_user not in room.members:
|
|
return False
|
|
|
|
# Get user's permissions for this room
|
|
permission = RoomMemberPermission.query.filter_by(
|
|
room_id=room.id,
|
|
user_id=current_user.id
|
|
).first()
|
|
|
|
# If no specific permissions are set, user only has view access
|
|
if not permission:
|
|
return perm_name == 'can_view'
|
|
|
|
# Check the specific permission
|
|
return getattr(permission, perm_name, False)
|
|
|
|
class ManagementAPIKey(db.Model):
|
|
__tablename__ = 'management_api_keys'
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
api_key = db.Column(db.String(100), unique=True, nullable=False)
|
|
name = db.Column(db.String(100), nullable=False) # Name/description of the management tool
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
last_used_at = db.Column(db.DateTime)
|
|
is_active = db.Column(db.Boolean, default=True)
|
|
created_by = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='SET NULL'))
|
|
|
|
# Relationships
|
|
creator = db.relationship('User', backref=db.backref('created_api_keys', cascade='all, delete-orphan'))
|
|
|
|
def __repr__(self):
|
|
return f'<ManagementAPIKey {self.name}>'
|
|
|
|
class Instance(db.Model):
|
|
__tablename__ = 'instances'
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
name = db.Column(db.String(100), unique=True, nullable=False)
|
|
company = db.Column(db.String(100), nullable=False)
|
|
rooms_count = db.Column(db.Integer, nullable=False, default=0)
|
|
conversations_count = db.Column(db.Integer, nullable=False, default=0)
|
|
data_size = db.Column(db.Float, nullable=False, default=0.0)
|
|
payment_plan = db.Column(db.String(20), nullable=False, default='Basic')
|
|
main_url = db.Column(db.String(255), unique=True, nullable=False)
|
|
status = db.Column(db.String(20), nullable=False, default='inactive')
|
|
status_details = db.Column(db.Text, nullable=True)
|
|
connection_token = db.Column(db.String(64), unique=True, nullable=True)
|
|
# Portainer integration fields
|
|
portainer_stack_id = db.Column(db.String(100), nullable=True) # Portainer stack ID
|
|
portainer_stack_name = db.Column(db.String(100), nullable=True) # Portainer stack name
|
|
# Version tracking fields
|
|
deployed_version = db.Column(db.String(50), nullable=True) # Current deployed version (commit hash or tag)
|
|
deployed_branch = db.Column(db.String(100), nullable=True) # Branch that was deployed
|
|
latest_version = db.Column(db.String(50), nullable=True) # Latest version available in Git
|
|
version_checked_at = db.Column(db.DateTime, nullable=True) # When version was last checked
|
|
created_at = db.Column(db.DateTime, nullable=False, server_default=db.text('CURRENT_TIMESTAMP'))
|
|
updated_at = db.Column(db.DateTime, nullable=False, server_default=db.text('CURRENT_TIMESTAMP'), onupdate=db.text('CURRENT_TIMESTAMP'))
|
|
|
|
def __repr__(self):
|
|
return f'<Instance {self.name}>'
|
|
|
|
class HelpArticle(db.Model):
|
|
__tablename__ = 'help_articles'
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
title = db.Column(db.String(200), nullable=False)
|
|
category = db.Column(db.String(50), nullable=False) # getting-started, user-management, file-management, communication, security, administration
|
|
body = db.Column(db.Text, nullable=False) # Rich text content
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
|
|
created_by = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
|
|
is_published = db.Column(db.Boolean, default=True)
|
|
order_index = db.Column(db.Integer, default=0) # For ordering articles within categories
|
|
|
|
# Relationships
|
|
creator = db.relationship('User', backref=db.backref('created_help_articles', cascade='all, delete-orphan'), foreign_keys=[created_by])
|
|
|
|
def __repr__(self):
|
|
return f'<HelpArticle {self.title} ({self.category})>'
|
|
|
|
@classmethod
|
|
def get_categories(cls):
|
|
"""Get all available categories with their display names"""
|
|
return {
|
|
'getting-started': 'Getting Started',
|
|
'user-management': 'User Management',
|
|
'file-management': 'File Management',
|
|
'communication': 'Communication',
|
|
'security': 'Security & Privacy',
|
|
'administration': 'Administration'
|
|
}
|
|
|
|
@classmethod
|
|
def get_articles_by_category(cls, category, published_only=True):
|
|
"""Get articles for a specific category"""
|
|
query = cls.query.filter_by(category=category)
|
|
if published_only:
|
|
query = query.filter_by(is_published=True)
|
|
return query.order_by(cls.order_index.asc(), cls.created_at.desc()).all()
|
|
|
|
@classmethod
|
|
def get_all_published(cls):
|
|
"""Get all published articles grouped by category"""
|
|
articles = cls.query.filter_by(is_published=True).order_by(cls.order_index.asc(), cls.created_at.desc()).all()
|
|
grouped = {}
|
|
for article in articles:
|
|
if article.category not in grouped:
|
|
grouped[article.category] = []
|
|
grouped[article.category].append(article)
|
|
return grouped
|
|
|
|
class PricingPlan(db.Model):
|
|
__tablename__ = 'pricing_plans'
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
name = db.Column(db.String(100), nullable=False)
|
|
description = db.Column(db.Text)
|
|
monthly_price = db.Column(db.Float, nullable=False)
|
|
annual_price = db.Column(db.Float, nullable=False)
|
|
features = db.Column(db.JSON, nullable=False) # List of feature strings
|
|
is_popular = db.Column(db.Boolean, default=False)
|
|
is_custom = db.Column(db.Boolean, default=False)
|
|
button_text = db.Column(db.String(50), default='Get Started')
|
|
button_url = db.Column(db.String(200), default='#')
|
|
# Stripe integration fields
|
|
stripe_product_id = db.Column(db.String(100), nullable=True)
|
|
stripe_monthly_price_id = db.Column(db.String(100), nullable=True)
|
|
stripe_annual_price_id = db.Column(db.String(100), nullable=True)
|
|
# Deprecated: Stripe payment links (to be removed in a future migration)
|
|
monthly_stripe_link = db.Column(db.String(500), nullable=True) # DEPRECATED
|
|
annual_stripe_link = db.Column(db.String(500), nullable=True) # DEPRECATED
|
|
order_index = db.Column(db.Integer, default=0)
|
|
is_active = db.Column(db.Boolean, default=True)
|
|
# Quota fields
|
|
room_quota = db.Column(db.Integer, default=0) # 0 = unlimited
|
|
conversation_quota = db.Column(db.Integer, default=0) # 0 = unlimited
|
|
storage_quota_gb = db.Column(db.Integer, default=0) # 0 = unlimited, stored in GB
|
|
manager_quota = db.Column(db.Integer, default=0) # 0 = unlimited
|
|
admin_quota = db.Column(db.Integer, default=0) # 0 = unlimited
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
|
|
created_by = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
|
|
|
|
# Relationships
|
|
creator = db.relationship('User', backref=db.backref('created_pricing_plans', cascade='all, delete-orphan'), foreign_keys=[created_by])
|
|
|
|
def __repr__(self):
|
|
return f'<PricingPlan {self.name}>'
|
|
|
|
@classmethod
|
|
def get_active_plans(cls):
|
|
"""Get all active pricing plans ordered by order_index"""
|
|
return cls.query.filter_by(is_active=True).order_by(cls.order_index).all()
|
|
|
|
@classmethod
|
|
def get_popular_plan(cls):
|
|
"""Get the plan marked as most popular"""
|
|
return cls.query.filter_by(is_active=True, is_popular=True).first()
|
|
|
|
def get_storage_quota_bytes(self):
|
|
"""Get storage quota in bytes"""
|
|
if self.storage_quota_gb == 0:
|
|
return 0 # Unlimited
|
|
return self.storage_quota_gb * 1024 * 1024 * 1024 # Convert GB to bytes
|
|
|
|
def format_quota_display(self, quota_type):
|
|
"""Format quota for display"""
|
|
if quota_type == 'room_quota':
|
|
return 'Unlimited' if self.room_quota == 0 else f'{self.room_quota} rooms'
|
|
elif quota_type == 'conversation_quota':
|
|
return 'Unlimited' if self.conversation_quota == 0 else f'{self.conversation_quota} conversations'
|
|
elif quota_type == 'storage_quota_gb':
|
|
return 'Unlimited' if self.storage_quota_gb == 0 else f'{self.storage_quota_gb}GB'
|
|
elif quota_type == 'manager_quota':
|
|
return 'Unlimited' if self.manager_quota == 0 else f'{self.manager_quota} managers'
|
|
elif quota_type == 'admin_quota':
|
|
return 'Unlimited' if self.admin_quota == 0 else f'{self.admin_quota} admins'
|
|
return 'Unknown'
|
|
|
|
def check_quota(self, quota_type, current_count):
|
|
"""Check if a quota would be exceeded"""
|
|
if quota_type == 'room_quota':
|
|
return self.room_quota == 0 or current_count < self.room_quota
|
|
elif quota_type == 'conversation_quota':
|
|
return self.conversation_quota == 0 or current_count < self.conversation_quota
|
|
elif quota_type == 'storage_quota_gb':
|
|
return self.storage_quota_gb == 0 or current_count < self.storage_quota_gb
|
|
elif quota_type == 'manager_quota':
|
|
return self.manager_quota == 0 or current_count < self.manager_quota
|
|
elif quota_type == 'admin_quota':
|
|
return self.admin_quota == 0 or current_count < self.admin_quota
|
|
return True
|
|
|
|
def get_quota_remaining(self, quota_type, current_count):
|
|
"""Get remaining quota"""
|
|
if quota_type == 'room_quota':
|
|
return float('inf') if self.room_quota == 0 else max(0, self.room_quota - current_count)
|
|
elif quota_type == 'conversation_quota':
|
|
return float('inf') if self.conversation_quota == 0 else max(0, self.conversation_quota - current_count)
|
|
elif quota_type == 'storage_quota_gb':
|
|
return float('inf') if self.storage_quota_gb == 0 else max(0, self.storage_quota_gb - current_count)
|
|
elif quota_type == 'manager_quota':
|
|
return float('inf') if self.manager_quota == 0 else max(0, self.manager_quota - current_count)
|
|
elif quota_type == 'admin_quota':
|
|
return float('inf') if self.admin_quota == 0 else max(0, self.admin_quota - current_count)
|
|
return 0
|
|
|
|
class Customer(db.Model):
|
|
__tablename__ = 'customer'
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
|
|
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
|
|
email = db.Column(db.String(150), nullable=False, index=True)
|
|
name = db.Column(db.String(150))
|
|
phone = db.Column(db.String(50))
|
|
billing_address_line1 = db.Column(db.String(255))
|
|
billing_address_line2 = db.Column(db.String(255))
|
|
billing_city = db.Column(db.String(100))
|
|
billing_state = db.Column(db.String(100))
|
|
billing_postal_code = db.Column(db.String(20))
|
|
billing_country = db.Column(db.String(100))
|
|
shipping_address_line1 = db.Column(db.String(255))
|
|
shipping_address_line2 = db.Column(db.String(255))
|
|
shipping_city = db.Column(db.String(100))
|
|
shipping_state = db.Column(db.String(100))
|
|
shipping_postal_code = db.Column(db.String(20))
|
|
shipping_country = db.Column(db.String(100))
|
|
tax_id_type = db.Column(db.String(50))
|
|
tax_id_value = db.Column(db.String(100))
|
|
stripe_customer_id = db.Column(db.String(255))
|
|
stripe_subscription_id = db.Column(db.String(255))
|
|
subscription_status = db.Column(db.String(50))
|
|
subscription_plan_id = db.Column(db.Integer, db.ForeignKey('pricing_plans.id'))
|
|
subscription_billing_cycle = db.Column(db.String(20))
|
|
subscription_current_period_start = db.Column(db.DateTime)
|
|
subscription_current_period_end = db.Column(db.DateTime)
|
|
# Relationship to pricing plan
|
|
plan = db.relationship('PricingPlan', backref='customers')
|
|
|
|
def __repr__(self):
|
|
return f'<Customer {self.email}>' |