535 lines
23 KiB
Python
535 lines
23 KiB
Python
from flask_sqlalchemy import SQLAlchemy
|
|
from flask_login import UserMixin
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
from datetime import datetime
|
|
from sqlalchemy.orm import relationship
|
|
from extensions import db
|
|
from enum import Enum
|
|
import json
|
|
|
|
# Association table for room members
|
|
room_members = db.Table('room_members',
|
|
db.Column('room_id', db.Integer, db.ForeignKey('room.id'), primary_key=True),
|
|
db.Column('user_id', db.Integer, db.ForeignKey('user.id'), primary_key=True)
|
|
)
|
|
|
|
# Association table for conversation members
|
|
conversation_members = db.Table('conversation_members',
|
|
db.Column('conversation_id', db.Integer, db.ForeignKey('conversation.id'), primary_key=True),
|
|
db.Column('user_id', db.Integer, db.ForeignKey('user.id'), primary_key=True)
|
|
)
|
|
|
|
class User(UserMixin, db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
username = db.Column(db.String(150), unique=True, nullable=False)
|
|
last_name = db.Column(db.String(150), nullable=False, default='--')
|
|
email = db.Column(db.String(150), unique=True, nullable=False)
|
|
password_hash = db.Column(db.String(256))
|
|
is_admin = db.Column(db.Boolean, default=False)
|
|
is_manager = db.Column(db.Boolean, default=False) # New field for manager role
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
phone = db.Column(db.String(20))
|
|
company = db.Column(db.String(100))
|
|
position = db.Column(db.String(100))
|
|
notes = db.Column(db.Text)
|
|
is_active = db.Column(db.Boolean, default=True)
|
|
profile_picture = db.Column(db.String(255))
|
|
preferred_view = db.Column(db.String(10), default='grid', nullable=False) # 'grid' or 'list'
|
|
room_permissions = relationship(
|
|
'RoomMemberPermission',
|
|
back_populates='user',
|
|
cascade='all, delete-orphan'
|
|
)
|
|
|
|
def set_password(self, password):
|
|
self.password_hash = generate_password_hash(password)
|
|
|
|
def check_password(self, password):
|
|
return check_password_hash(self.password_hash, password)
|
|
|
|
def __repr__(self):
|
|
return f'<User {self.username}>'
|
|
|
|
class Room(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
name = db.Column(db.String(100), nullable=False)
|
|
description = db.Column(db.Text)
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
created_by = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
|
|
|
|
# Relationships
|
|
creator = db.relationship('User', backref=db.backref('created_rooms', cascade='all, delete-orphan'), foreign_keys=[created_by])
|
|
members = db.relationship('User', secondary=room_members, backref=db.backref('rooms', lazy='dynamic'))
|
|
member_permissions = relationship('RoomMemberPermission', back_populates='room', cascade='all, delete-orphan')
|
|
files = db.relationship('RoomFile', back_populates='room', cascade='all, delete-orphan')
|
|
|
|
def __repr__(self):
|
|
return f'<Room {self.name}>'
|
|
|
|
# Association table for room members with permissions
|
|
class RoomMemberPermission(db.Model):
|
|
__tablename__ = 'room_member_permissions'
|
|
room_id = db.Column(db.Integer, db.ForeignKey('room.id'), primary_key=True)
|
|
user_id = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), primary_key=True)
|
|
can_view = db.Column(db.Boolean, default=True, nullable=False)
|
|
can_download = db.Column(db.Boolean, default=False, nullable=False)
|
|
can_upload = db.Column(db.Boolean, default=False, nullable=False)
|
|
can_delete = db.Column(db.Boolean, default=False, nullable=False)
|
|
can_rename = db.Column(db.Boolean, default=False, nullable=False)
|
|
can_move = db.Column(db.Boolean, default=False, nullable=False)
|
|
can_share = db.Column(db.Boolean, default=False, nullable=False)
|
|
# Relationships
|
|
user = relationship('User', back_populates='room_permissions')
|
|
room = relationship('Room', back_populates='member_permissions')
|
|
|
|
class RoomFile(db.Model):
|
|
__tablename__ = 'room_file'
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
room_id = db.Column(db.Integer, db.ForeignKey('room.id'), nullable=False)
|
|
name = db.Column(db.String(255), nullable=False)
|
|
path = db.Column(db.String(255), nullable=False, default='')
|
|
type = db.Column(db.String(10), nullable=False) # 'file' or 'folder'
|
|
size = db.Column(db.Integer) # in bytes, null for folders
|
|
modified = db.Column(db.Float) # timestamp
|
|
uploaded_by = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'))
|
|
uploaded_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
deleted = db.Column(db.Boolean, default=False) # New field for deleted status
|
|
deleted_by = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'))
|
|
deleted_at = db.Column(db.DateTime) # New field for tracking when the file was deleted
|
|
uploader = db.relationship('User', backref=db.backref('uploaded_files', cascade='all, delete-orphan'), foreign_keys=[uploaded_by])
|
|
deleter = db.relationship('User', backref=db.backref('deleted_room_files', cascade='all, delete-orphan'), foreign_keys=[deleted_by])
|
|
room = db.relationship('Room', back_populates='files')
|
|
starred_by = db.relationship('User', secondary='user_starred_file', backref='starred_files')
|
|
|
|
def __repr__(self):
|
|
return f'<RoomFile {self.name} ({self.type}) in {self.path}>'
|
|
|
|
class UserStarredFile(db.Model):
|
|
__tablename__ = 'user_starred_file'
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
user_id = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
|
|
file_id = db.Column(db.Integer, db.ForeignKey('room_file.id'), nullable=False)
|
|
starred_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
|
|
# Add unique constraint to prevent duplicate stars
|
|
__table_args__ = (
|
|
db.UniqueConstraint('user_id', 'file_id', name='unique_user_file_star'),
|
|
)
|
|
|
|
def __repr__(self):
|
|
return f'<UserStarredFile user_id={self.user_id} file_id={self.file_id}>'
|
|
|
|
class TrashedFile(db.Model):
|
|
__tablename__ = 'trashed_file'
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
room_id = db.Column(db.Integer, db.ForeignKey('room.id'), nullable=False)
|
|
name = db.Column(db.String(255), nullable=False)
|
|
original_path = db.Column(db.String(255), nullable=False, default='')
|
|
type = db.Column(db.String(10), nullable=False) # 'file' or 'folder'
|
|
size = db.Column(db.Integer) # in bytes, null for folders
|
|
modified = db.Column(db.Float) # timestamp
|
|
uploaded_by = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'))
|
|
uploaded_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
deleted_by = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
|
|
deleted_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
room = db.relationship('Room', backref='trashed_files')
|
|
uploader = db.relationship('User', foreign_keys=[uploaded_by], backref=db.backref('uploaded_trashed_files', cascade='all, delete-orphan'))
|
|
deleter = db.relationship('User', foreign_keys=[deleted_by], backref=db.backref('deleted_trashed_files', cascade='all, delete-orphan'))
|
|
|
|
def __repr__(self):
|
|
return f'<TrashedFile {self.name} ({self.type}) from {self.original_path}>'
|
|
|
|
class SiteSettings(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
primary_color = db.Column(db.String(7), default='#16767b') # Default from colors.css
|
|
secondary_color = db.Column(db.String(7), default='#741b5f') # Default from colors.css
|
|
company_name = db.Column(db.String(100))
|
|
company_logo = db.Column(db.String(255)) # Store the filename of the logo
|
|
company_website = db.Column(db.String(200))
|
|
company_email = db.Column(db.String(100))
|
|
company_phone = db.Column(db.String(20))
|
|
company_address = db.Column(db.String(200))
|
|
company_city = db.Column(db.String(100))
|
|
company_state = db.Column(db.String(100))
|
|
company_zip = db.Column(db.String(20))
|
|
company_country = db.Column(db.String(100))
|
|
company_description = db.Column(db.Text)
|
|
company_industry = db.Column(db.String(100))
|
|
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
|
|
|
|
@classmethod
|
|
def get_settings(cls):
|
|
settings = cls.query.first()
|
|
if not settings:
|
|
settings = cls()
|
|
db.session.add(settings)
|
|
db.session.commit()
|
|
return settings
|
|
|
|
class DocuPulseSettings(db.Model):
|
|
__tablename__ = 'docupulse_settings'
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
max_rooms = db.Column(db.Integer, default=10)
|
|
max_conversations = db.Column(db.Integer, default=10)
|
|
max_storage = db.Column(db.BigInteger, default=10737418240) # 10GB in bytes
|
|
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
|
|
|
|
@classmethod
|
|
def get_settings(cls):
|
|
try:
|
|
settings = cls.query.first()
|
|
if not settings:
|
|
settings = cls(
|
|
max_rooms=10,
|
|
max_conversations=10,
|
|
max_storage=10737418240 # 10GB in bytes
|
|
)
|
|
db.session.add(settings)
|
|
db.session.commit()
|
|
return settings
|
|
except Exception as e:
|
|
# If there's an error (like integer overflow), rollback and return None
|
|
db.session.rollback()
|
|
return None
|
|
|
|
@classmethod
|
|
def get_usage_stats(cls):
|
|
settings = cls.get_settings()
|
|
if not settings:
|
|
# Return default values if settings can't be retrieved
|
|
return {
|
|
'max_rooms': 10,
|
|
'max_conversations': 10,
|
|
'max_storage': 10737418240,
|
|
'current_rooms': 0,
|
|
'current_conversations': 0,
|
|
'current_storage': 0,
|
|
'rooms_percentage': 0,
|
|
'conversations_percentage': 0,
|
|
'storage_percentage': 0
|
|
}
|
|
|
|
total_rooms = Room.query.count()
|
|
total_conversations = Conversation.query.count()
|
|
total_storage = db.session.query(db.func.sum(RoomFile.size)).filter(RoomFile.deleted == False).scalar() or 0
|
|
|
|
return {
|
|
'max_rooms': settings.max_rooms,
|
|
'max_conversations': settings.max_conversations,
|
|
'max_storage': settings.max_storage,
|
|
'current_rooms': total_rooms,
|
|
'current_conversations': total_conversations,
|
|
'current_storage': total_storage,
|
|
'rooms_percentage': (total_rooms / settings.max_rooms) * 100 if settings.max_rooms > 0 else 0,
|
|
'conversations_percentage': (total_conversations / settings.max_conversations) * 100 if settings.max_conversations > 0 else 0,
|
|
'storage_percentage': (total_storage / settings.max_storage) * 100 if settings.max_storage > 0 else 0
|
|
}
|
|
|
|
class KeyValueSettings(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
key = db.Column(db.String(100), unique=True, nullable=False)
|
|
value = db.Column(db.Text)
|
|
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
|
|
|
|
@classmethod
|
|
def get_value(cls, key, default=None):
|
|
setting = cls.query.filter_by(key=key).first()
|
|
if setting:
|
|
try:
|
|
return json.loads(setting.value)
|
|
except (json.JSONDecodeError, TypeError):
|
|
return setting.value
|
|
return default
|
|
|
|
@classmethod
|
|
def set_value(cls, key, value):
|
|
setting = cls.query.filter_by(key=key).first()
|
|
if not setting:
|
|
setting = cls(key=key)
|
|
|
|
if isinstance(value, (dict, list)):
|
|
setting.value = json.dumps(value)
|
|
else:
|
|
setting.value = str(value)
|
|
|
|
db.session.add(setting)
|
|
db.session.commit()
|
|
return setting
|
|
|
|
class Conversation(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
name = db.Column(db.String(100), nullable=False)
|
|
description = db.Column(db.Text)
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
created_by = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
|
|
|
|
# Relationships
|
|
creator = db.relationship('User', backref=db.backref('created_conversations', cascade='all, delete-orphan'), foreign_keys=[created_by])
|
|
members = db.relationship('User', secondary=conversation_members, backref=db.backref('conversations', lazy='dynamic'))
|
|
messages = db.relationship('Message', back_populates='conversation', cascade='all, delete-orphan')
|
|
|
|
def __repr__(self):
|
|
return f'<Conversation {self.name}>'
|
|
|
|
class Message(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
content = db.Column(db.Text, nullable=False)
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
conversation_id = db.Column(db.Integer, db.ForeignKey('conversation.id'), nullable=False)
|
|
user_id = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
|
|
|
|
# Relationships
|
|
conversation = db.relationship('Conversation', back_populates='messages')
|
|
user = db.relationship('User', backref=db.backref('messages', cascade='all, delete-orphan'))
|
|
attachments = db.relationship('MessageAttachment', back_populates='message', cascade='all, delete-orphan')
|
|
|
|
def __repr__(self):
|
|
return f'<Message {self.id}>'
|
|
|
|
class MessageAttachment(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
message_id = db.Column(db.Integer, db.ForeignKey('message.id'), nullable=False)
|
|
name = db.Column(db.String(255), nullable=False)
|
|
path = db.Column(db.String(512), nullable=False)
|
|
type = db.Column(db.String(100))
|
|
size = db.Column(db.Integer) # Size in bytes
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
|
|
# Relationships
|
|
message = db.relationship('Message', back_populates='attachments')
|
|
|
|
def __repr__(self):
|
|
return f'<MessageAttachment {self.name}>'
|
|
|
|
class EventType(Enum):
|
|
# User events
|
|
USER_LOGIN = 'user_login'
|
|
USER_LOGOUT = 'user_logout'
|
|
USER_CREATE = 'user_create'
|
|
USER_UPDATE = 'user_update'
|
|
USER_DELETE = 'user_delete'
|
|
|
|
# Room events
|
|
ROOM_CREATE = 'room_create'
|
|
ROOM_UPDATE = 'room_update'
|
|
ROOM_DELETE = 'room_delete'
|
|
ROOM_MEMBER_ADD = 'room_member_add'
|
|
ROOM_MEMBER_REMOVE = 'room_member_remove'
|
|
ROOM_PERMISSION_UPDATE = 'room_permission_update'
|
|
|
|
# File events
|
|
FILE_UPLOAD = 'file_upload'
|
|
FILE_DOWNLOAD = 'file_download'
|
|
FILE_DELETE = 'file_delete'
|
|
FILE_RENAME = 'file_rename'
|
|
FILE_MOVE = 'file_move'
|
|
FILE_STAR = 'file_star'
|
|
FILE_UNSTAR = 'file_unstar'
|
|
|
|
# Conversation events
|
|
CONVERSATION_CREATE = 'conversation_create'
|
|
CONVERSATION_UPDATE = 'conversation_update'
|
|
CONVERSATION_DELETE = 'conversation_delete'
|
|
CONVERSATION_MEMBER_ADD = 'conversation_member_add'
|
|
CONVERSATION_MEMBER_REMOVE = 'conversation_member_remove'
|
|
CONVERSATION_OPEN = 'conversation_open'
|
|
|
|
# Message events
|
|
MESSAGE_CREATE = 'message_create'
|
|
MESSAGE_UPDATE = 'message_update'
|
|
MESSAGE_DELETE = 'message_delete'
|
|
MESSAGE_ATTACHMENT_ADD = 'message_attachment_add'
|
|
MESSAGE_ATTACHMENT_REMOVE = 'message_attachment_remove'
|
|
|
|
# Settings events
|
|
SETTINGS_UPDATE = 'settings_update'
|
|
|
|
class Event(db.Model):
|
|
__tablename__ = 'events'
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
event_type = db.Column(db.String(50), nullable=False)
|
|
user_id = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=True)
|
|
timestamp = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
|
|
details = db.Column(db.JSON) # Store additional event-specific data
|
|
ip_address = db.Column(db.String(45)) # IPv6 addresses can be up to 45 chars
|
|
user_agent = db.Column(db.String(255))
|
|
|
|
# Relationships
|
|
user = db.relationship('User', backref=db.backref('events', cascade='all, delete-orphan'))
|
|
|
|
def __repr__(self):
|
|
return f'<Event {self.event_type} by User {self.user_id} at {self.timestamp}>'
|
|
|
|
class NotifType(Enum):
|
|
# User notifications
|
|
ACCOUNT_CREATED = 'account_created'
|
|
PASSWORD_RESET = 'password_reset'
|
|
ACCOUNT_DELETED = 'account_deleted'
|
|
ACCOUNT_UPDATED = 'account_updated'
|
|
|
|
# Room notifications
|
|
ROOM_INVITE = 'room_invite'
|
|
ROOM_INVITE_REMOVED = 'room_invite_removed'
|
|
|
|
# Conversation notifications
|
|
CONVERSATION_INVITE = 'conversation_invite'
|
|
CONVERSATION_INVITE_REMOVED = 'conversation_invite_removed'
|
|
CONVERSATION_MESSAGE = 'conversation_message'
|
|
|
|
class Notif(db.Model):
|
|
__tablename__ = 'notifs'
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
notif_type = db.Column(db.String(50), nullable=False)
|
|
user_id = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
|
|
sender_id = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=True)
|
|
timestamp = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
|
|
read = db.Column(db.Boolean, default=False, nullable=False)
|
|
details = db.Column(db.JSON) # Store additional notification-specific data
|
|
|
|
# Relationships
|
|
user = db.relationship('User', foreign_keys=[user_id], backref=db.backref('notifications', cascade='all, delete-orphan'))
|
|
sender = db.relationship('User', foreign_keys=[sender_id], backref='sent_notifications')
|
|
|
|
def __repr__(self):
|
|
return f'<Notif {self.notif_type} for User {self.user_id} at {self.timestamp}>'
|
|
|
|
class EmailTemplate(db.Model):
|
|
__tablename__ = 'email_templates'
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
name = db.Column(db.String(100), nullable=False)
|
|
subject = db.Column(db.String(200), nullable=False)
|
|
body = db.Column(db.Text, nullable=False)
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
|
|
created_by = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
|
|
is_active = db.Column(db.Boolean, default=True)
|
|
|
|
# Relationships
|
|
creator = db.relationship('User', backref=db.backref('created_email_templates', cascade='all, delete-orphan'), foreign_keys=[created_by])
|
|
|
|
def __repr__(self):
|
|
return f'<EmailTemplate {self.name}>'
|
|
|
|
class Mail(db.Model):
|
|
__tablename__ = 'mails'
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
recipient = db.Column(db.String(150), nullable=False)
|
|
subject = db.Column(db.String(200), nullable=False)
|
|
body = db.Column(db.Text, nullable=False)
|
|
status = db.Column(db.String(20), default='pending', nullable=False) # e.g., pending, sent, failed
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
sent_at = db.Column(db.DateTime, nullable=True)
|
|
template_id = db.Column(db.Integer, db.ForeignKey('email_templates.id'), nullable=True)
|
|
notif_id = db.Column(db.Integer, db.ForeignKey('notifs.id'), nullable=True)
|
|
|
|
# Relationships
|
|
template = db.relationship('EmailTemplate', backref='mails')
|
|
notif = db.relationship('Notif', backref='mails')
|
|
|
|
def __repr__(self):
|
|
return f'<Mail to {self.recipient} status={self.status}>'
|
|
|
|
class PasswordSetupToken(db.Model):
|
|
__tablename__ = 'password_setup_tokens'
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
user_id = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
|
|
token = db.Column(db.String(100), unique=True, nullable=False)
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
expires_at = db.Column(db.DateTime, nullable=False)
|
|
used = db.Column(db.Boolean, default=False)
|
|
|
|
# Relationships
|
|
user = db.relationship('User', backref=db.backref('password_setup_tokens', cascade='all, delete-orphan'))
|
|
|
|
def is_valid(self):
|
|
return not self.used and datetime.utcnow() < self.expires_at
|
|
|
|
def __repr__(self):
|
|
return f'<PasswordSetupToken {self.token}>'
|
|
|
|
class PasswordResetToken(db.Model):
|
|
__tablename__ = 'password_reset_tokens'
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
user_id = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
|
|
token = db.Column(db.String(100), unique=True, nullable=False)
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
expires_at = db.Column(db.DateTime, nullable=False)
|
|
used = db.Column(db.Boolean, default=False)
|
|
ip_address = db.Column(db.String(45)) # Store IP address for security
|
|
|
|
# Relationships
|
|
user = db.relationship('User', backref=db.backref('password_reset_tokens', cascade='all, delete-orphan'))
|
|
|
|
def is_valid(self):
|
|
return not self.used and datetime.utcnow() < self.expires_at
|
|
|
|
def __repr__(self):
|
|
return f'<PasswordResetToken {self.token}>'
|
|
|
|
def user_has_permission(room, perm_name):
|
|
"""
|
|
Check if the current user has a specific permission in a room.
|
|
|
|
Args:
|
|
room: Room object
|
|
perm_name: Name of the permission to check (e.g., 'can_view', 'can_upload')
|
|
|
|
Returns:
|
|
bool: True if user has permission, False otherwise
|
|
"""
|
|
# Admin and manager users have all permissions
|
|
if current_user.is_admin or current_user.is_manager:
|
|
return True
|
|
|
|
# Check if user is a member of the room
|
|
if current_user not in room.members:
|
|
return False
|
|
|
|
# Get user's permissions for this room
|
|
permission = RoomMemberPermission.query.filter_by(
|
|
room_id=room.id,
|
|
user_id=current_user.id
|
|
).first()
|
|
|
|
# If no specific permissions are set, user only has view access
|
|
if not permission:
|
|
return perm_name == 'can_view'
|
|
|
|
# Check the specific permission
|
|
return getattr(permission, perm_name, False)
|
|
|
|
class ManagementAPIKey(db.Model):
|
|
__tablename__ = 'management_api_keys'
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
api_key = db.Column(db.String(100), unique=True, nullable=False)
|
|
name = db.Column(db.String(100), nullable=False) # Name/description of the management tool
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
last_used_at = db.Column(db.DateTime)
|
|
is_active = db.Column(db.Boolean, default=True)
|
|
created_by = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='SET NULL'))
|
|
|
|
# Relationships
|
|
creator = db.relationship('User', backref=db.backref('created_api_keys', cascade='all, delete-orphan'))
|
|
|
|
def __repr__(self):
|
|
return f'<ManagementAPIKey {self.name}>'
|
|
|
|
class Instance(db.Model):
|
|
__tablename__ = 'instances'
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
name = db.Column(db.String(100), unique=True, nullable=False)
|
|
company = db.Column(db.String(100), nullable=False)
|
|
rooms_count = db.Column(db.Integer, nullable=False, default=0)
|
|
conversations_count = db.Column(db.Integer, nullable=False, default=0)
|
|
data_size = db.Column(db.Float, nullable=False, default=0.0)
|
|
payment_plan = db.Column(db.String(20), nullable=False, default='Basic')
|
|
main_url = db.Column(db.String(255), unique=True, nullable=False)
|
|
status = db.Column(db.String(20), nullable=False, default='inactive')
|
|
status_details = db.Column(db.Text, nullable=True)
|
|
connection_token = db.Column(db.String(64), unique=True, nullable=True)
|
|
created_at = db.Column(db.DateTime, nullable=False, server_default=db.text('CURRENT_TIMESTAMP'))
|
|
updated_at = db.Column(db.DateTime, nullable=False, server_default=db.text('CURRENT_TIMESTAMP'), onupdate=db.text('CURRENT_TIMESTAMP'))
|
|
|
|
def __repr__(self):
|
|
return f'<Instance {self.name}>' |