Files
docupulse/models.py
2025-06-26 15:15:16 +02:00

722 lines
33 KiB
Python

from flask_sqlalchemy import SQLAlchemy
from flask_login import UserMixin
from werkzeug.security import generate_password_hash, check_password_hash
from datetime import datetime
from sqlalchemy.orm import relationship
from extensions import db
from enum import Enum
import json
# Association table for room members
room_members = db.Table('room_members',
db.Column('room_id', db.Integer, db.ForeignKey('room.id'), primary_key=True),
db.Column('user_id', db.Integer, db.ForeignKey('user.id'), primary_key=True)
)
# Association table for conversation members
conversation_members = db.Table('conversation_members',
db.Column('conversation_id', db.Integer, db.ForeignKey('conversation.id'), primary_key=True),
db.Column('user_id', db.Integer, db.ForeignKey('user.id'), primary_key=True)
)
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(150), unique=True, nullable=False)
last_name = db.Column(db.String(150), nullable=False, default='--')
email = db.Column(db.String(150), unique=True, nullable=False)
password_hash = db.Column(db.String(256))
is_admin = db.Column(db.Boolean, default=False)
is_manager = db.Column(db.Boolean, default=False) # New field for manager role
created_at = db.Column(db.DateTime, default=datetime.utcnow)
phone = db.Column(db.String(20))
company = db.Column(db.String(100))
position = db.Column(db.String(100))
notes = db.Column(db.Text)
is_active = db.Column(db.Boolean, default=True)
profile_picture = db.Column(db.String(255))
preferred_view = db.Column(db.String(10), default='grid', nullable=False) # 'grid' or 'list'
room_permissions = relationship(
'RoomMemberPermission',
back_populates='user',
cascade='all, delete-orphan'
)
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
def __repr__(self):
return f'<User {self.username}>'
class Room(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
description = db.Column(db.Text)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
created_by = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
# Relationships
creator = db.relationship('User', backref=db.backref('created_rooms', cascade='all, delete-orphan'), foreign_keys=[created_by])
members = db.relationship('User', secondary=room_members, backref=db.backref('rooms', lazy='dynamic'))
member_permissions = relationship('RoomMemberPermission', back_populates='room', cascade='all, delete-orphan')
files = db.relationship('RoomFile', back_populates='room', cascade='all, delete-orphan')
def __repr__(self):
return f'<Room {self.name}>'
# Association table for room members with permissions
class RoomMemberPermission(db.Model):
__tablename__ = 'room_member_permissions'
room_id = db.Column(db.Integer, db.ForeignKey('room.id'), primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), primary_key=True)
can_view = db.Column(db.Boolean, default=True, nullable=False)
can_download = db.Column(db.Boolean, default=False, nullable=False)
can_upload = db.Column(db.Boolean, default=False, nullable=False)
can_delete = db.Column(db.Boolean, default=False, nullable=False)
can_rename = db.Column(db.Boolean, default=False, nullable=False)
can_move = db.Column(db.Boolean, default=False, nullable=False)
can_share = db.Column(db.Boolean, default=False, nullable=False)
# Relationships
user = relationship('User', back_populates='room_permissions')
room = relationship('Room', back_populates='member_permissions')
class RoomFile(db.Model):
__tablename__ = 'room_file'
id = db.Column(db.Integer, primary_key=True)
room_id = db.Column(db.Integer, db.ForeignKey('room.id'), nullable=False)
name = db.Column(db.String(255), nullable=False)
path = db.Column(db.String(255), nullable=False, default='')
type = db.Column(db.String(10), nullable=False) # 'file' or 'folder'
size = db.Column(db.Integer) # in bytes, null for folders
modified = db.Column(db.Float) # timestamp
uploaded_by = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'))
uploaded_at = db.Column(db.DateTime, default=datetime.utcnow)
deleted = db.Column(db.Boolean, default=False) # New field for deleted status
deleted_by = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'))
deleted_at = db.Column(db.DateTime) # New field for tracking when the file was deleted
uploader = db.relationship('User', backref=db.backref('uploaded_files', cascade='all, delete-orphan'), foreign_keys=[uploaded_by])
deleter = db.relationship('User', backref=db.backref('deleted_room_files', cascade='all, delete-orphan'), foreign_keys=[deleted_by])
room = db.relationship('Room', back_populates='files')
starred_by = db.relationship('User', secondary='user_starred_file', backref='starred_files')
def __repr__(self):
return f'<RoomFile {self.name} ({self.type}) in {self.path}>'
class UserStarredFile(db.Model):
__tablename__ = 'user_starred_file'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
file_id = db.Column(db.Integer, db.ForeignKey('room_file.id'), nullable=False)
starred_at = db.Column(db.DateTime, default=datetime.utcnow)
# Add unique constraint to prevent duplicate stars
__table_args__ = (
db.UniqueConstraint('user_id', 'file_id', name='unique_user_file_star'),
)
def __repr__(self):
return f'<UserStarredFile user_id={self.user_id} file_id={self.file_id}>'
class TrashedFile(db.Model):
__tablename__ = 'trashed_file'
id = db.Column(db.Integer, primary_key=True)
room_id = db.Column(db.Integer, db.ForeignKey('room.id'), nullable=False)
name = db.Column(db.String(255), nullable=False)
original_path = db.Column(db.String(255), nullable=False, default='')
type = db.Column(db.String(10), nullable=False) # 'file' or 'folder'
size = db.Column(db.Integer) # in bytes, null for folders
modified = db.Column(db.Float) # timestamp
uploaded_by = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'))
uploaded_at = db.Column(db.DateTime, default=datetime.utcnow)
deleted_by = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
deleted_at = db.Column(db.DateTime, default=datetime.utcnow)
room = db.relationship('Room', backref='trashed_files')
uploader = db.relationship('User', foreign_keys=[uploaded_by], backref=db.backref('uploaded_trashed_files', cascade='all, delete-orphan'))
deleter = db.relationship('User', foreign_keys=[deleted_by], backref=db.backref('deleted_trashed_files', cascade='all, delete-orphan'))
def __repr__(self):
return f'<TrashedFile {self.name} ({self.type}) from {self.original_path}>'
class SiteSettings(db.Model):
id = db.Column(db.Integer, primary_key=True)
primary_color = db.Column(db.String(7), default='#16767b') # Default from colors.css
secondary_color = db.Column(db.String(7), default='#741b5f') # Default from colors.css
company_name = db.Column(db.String(100))
company_logo = db.Column(db.String(255)) # Store the filename of the logo
company_website = db.Column(db.String(200))
company_email = db.Column(db.String(100))
company_phone = db.Column(db.String(20))
company_address = db.Column(db.String(200))
company_city = db.Column(db.String(100))
company_state = db.Column(db.String(100))
company_zip = db.Column(db.String(20))
company_country = db.Column(db.String(100))
company_description = db.Column(db.Text)
company_industry = db.Column(db.String(100))
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
@classmethod
def get_settings(cls):
settings = cls.query.first()
if not settings:
settings = cls()
db.session.add(settings)
db.session.commit()
return settings
class DocuPulseSettings(db.Model):
__tablename__ = 'docupulse_settings'
id = db.Column(db.Integer, primary_key=True)
max_rooms = db.Column(db.Integer, default=10)
max_conversations = db.Column(db.Integer, default=10)
max_storage = db.Column(db.BigInteger, default=10737418240) # 10GB in bytes
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
@classmethod
def get_settings(cls):
try:
settings = cls.query.first()
if not settings:
settings = cls(
max_rooms=10,
max_conversations=10,
max_storage=10737418240 # 10GB in bytes
)
db.session.add(settings)
db.session.commit()
return settings
except Exception as e:
# If there's an error (like integer overflow), rollback and return None
db.session.rollback()
return None
@classmethod
def get_usage_stats(cls):
settings = cls.get_settings()
if not settings:
# Return default values if settings can't be retrieved
return {
'max_rooms': 10,
'max_conversations': 10,
'max_storage': 10737418240,
'current_rooms': 0,
'current_conversations': 0,
'current_storage': 0,
'rooms_percentage': 0,
'conversations_percentage': 0,
'storage_percentage': 0
}
total_rooms = Room.query.count()
total_conversations = Conversation.query.count()
total_storage = db.session.query(db.func.sum(RoomFile.size)).filter(RoomFile.deleted == False).scalar() or 0
return {
'max_rooms': settings.max_rooms,
'max_conversations': settings.max_conversations,
'max_storage': settings.max_storage,
'current_rooms': total_rooms,
'current_conversations': total_conversations,
'current_storage': total_storage,
'rooms_percentage': (total_rooms / settings.max_rooms) * 100 if settings.max_rooms > 0 else 0,
'conversations_percentage': (total_conversations / settings.max_conversations) * 100 if settings.max_conversations > 0 else 0,
'storage_percentage': (total_storage / settings.max_storage) * 100 if settings.max_storage > 0 else 0
}
class KeyValueSettings(db.Model):
id = db.Column(db.Integer, primary_key=True)
key = db.Column(db.String(100), unique=True, nullable=False)
value = db.Column(db.Text)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
@classmethod
def get_value(cls, key, default=None):
setting = cls.query.filter_by(key=key).first()
if setting:
try:
return json.loads(setting.value)
except (json.JSONDecodeError, TypeError):
return setting.value
return default
@classmethod
def set_value(cls, key, value):
setting = cls.query.filter_by(key=key).first()
if not setting:
setting = cls(key=key)
if isinstance(value, (dict, list)):
setting.value = json.dumps(value)
else:
setting.value = str(value)
db.session.add(setting)
db.session.commit()
return setting
class Conversation(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
description = db.Column(db.Text)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
created_by = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
# Relationships
creator = db.relationship('User', backref=db.backref('created_conversations', cascade='all, delete-orphan'), foreign_keys=[created_by])
members = db.relationship('User', secondary=conversation_members, backref=db.backref('conversations', lazy='dynamic'))
messages = db.relationship('Message', back_populates='conversation', cascade='all, delete-orphan')
def __repr__(self):
return f'<Conversation {self.name}>'
class Message(db.Model):
id = db.Column(db.Integer, primary_key=True)
content = db.Column(db.Text, nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
conversation_id = db.Column(db.Integer, db.ForeignKey('conversation.id'), nullable=False)
user_id = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
# Relationships
conversation = db.relationship('Conversation', back_populates='messages')
user = db.relationship('User', backref=db.backref('messages', cascade='all, delete-orphan'))
attachments = db.relationship('MessageAttachment', back_populates='message', cascade='all, delete-orphan')
def __repr__(self):
return f'<Message {self.id}>'
class MessageAttachment(db.Model):
id = db.Column(db.Integer, primary_key=True)
message_id = db.Column(db.Integer, db.ForeignKey('message.id'), nullable=False)
name = db.Column(db.String(255), nullable=False)
path = db.Column(db.String(512), nullable=False)
type = db.Column(db.String(100))
size = db.Column(db.Integer) # Size in bytes
created_at = db.Column(db.DateTime, default=datetime.utcnow)
# Relationships
message = db.relationship('Message', back_populates='attachments')
def __repr__(self):
return f'<MessageAttachment {self.name}>'
class EventType(Enum):
# User events
USER_LOGIN = 'user_login'
USER_LOGOUT = 'user_logout'
USER_CREATE = 'user_create'
USER_UPDATE = 'user_update'
USER_DELETE = 'user_delete'
# Room events
ROOM_CREATE = 'room_create'
ROOM_UPDATE = 'room_update'
ROOM_DELETE = 'room_delete'
ROOM_MEMBER_ADD = 'room_member_add'
ROOM_MEMBER_REMOVE = 'room_member_remove'
ROOM_PERMISSION_UPDATE = 'room_permission_update'
# File events
FILE_UPLOAD = 'file_upload'
FILE_DOWNLOAD = 'file_download'
FILE_DELETE = 'file_delete'
FILE_RENAME = 'file_rename'
FILE_MOVE = 'file_move'
FILE_STAR = 'file_star'
FILE_UNSTAR = 'file_unstar'
# Conversation events
CONVERSATION_CREATE = 'conversation_create'
CONVERSATION_UPDATE = 'conversation_update'
CONVERSATION_DELETE = 'conversation_delete'
CONVERSATION_MEMBER_ADD = 'conversation_member_add'
CONVERSATION_MEMBER_REMOVE = 'conversation_member_remove'
CONVERSATION_OPEN = 'conversation_open'
# Message events
MESSAGE_CREATE = 'message_create'
MESSAGE_UPDATE = 'message_update'
MESSAGE_DELETE = 'message_delete'
MESSAGE_ATTACHMENT_ADD = 'message_attachment_add'
MESSAGE_ATTACHMENT_REMOVE = 'message_attachment_remove'
# Settings events
SETTINGS_UPDATE = 'settings_update'
class Event(db.Model):
__tablename__ = 'events'
id = db.Column(db.Integer, primary_key=True)
event_type = db.Column(db.String(50), nullable=False)
user_id = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=True)
timestamp = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
details = db.Column(db.JSON) # Store additional event-specific data
ip_address = db.Column(db.String(45)) # IPv6 addresses can be up to 45 chars
user_agent = db.Column(db.String(255))
# Relationships
user = db.relationship('User', backref=db.backref('events', cascade='all, delete-orphan'))
def __repr__(self):
return f'<Event {self.event_type} by User {self.user_id} at {self.timestamp}>'
class NotifType(Enum):
# User notifications
ACCOUNT_CREATED = 'account_created'
PASSWORD_RESET = 'password_reset'
ACCOUNT_DELETED = 'account_deleted'
ACCOUNT_UPDATED = 'account_updated'
# Room notifications
ROOM_INVITE = 'room_invite'
ROOM_INVITE_REMOVED = 'room_invite_removed'
# Conversation notifications
CONVERSATION_INVITE = 'conversation_invite'
CONVERSATION_INVITE_REMOVED = 'conversation_invite_removed'
CONVERSATION_MESSAGE = 'conversation_message'
class Notif(db.Model):
__tablename__ = 'notifs'
id = db.Column(db.Integer, primary_key=True)
notif_type = db.Column(db.String(50), nullable=False)
user_id = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
sender_id = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=True)
timestamp = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
read = db.Column(db.Boolean, default=False, nullable=False)
details = db.Column(db.JSON) # Store additional notification-specific data
# Relationships
user = db.relationship('User', foreign_keys=[user_id], backref=db.backref('notifications', cascade='all, delete-orphan'))
sender = db.relationship('User', foreign_keys=[sender_id], backref='sent_notifications')
def __repr__(self):
return f'<Notif {self.notif_type} for User {self.user_id} at {self.timestamp}>'
class EmailTemplate(db.Model):
__tablename__ = 'email_templates'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
subject = db.Column(db.String(200), nullable=False)
body = db.Column(db.Text, nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
created_by = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
is_active = db.Column(db.Boolean, default=True)
# Relationships
creator = db.relationship('User', backref=db.backref('created_email_templates', cascade='all, delete-orphan'), foreign_keys=[created_by])
def __repr__(self):
return f'<EmailTemplate {self.name}>'
class Mail(db.Model):
__tablename__ = 'mails'
id = db.Column(db.Integer, primary_key=True)
recipient = db.Column(db.String(150), nullable=False)
subject = db.Column(db.String(200), nullable=False)
body = db.Column(db.Text, nullable=False)
status = db.Column(db.String(20), default='pending', nullable=False) # e.g., pending, sent, failed
created_at = db.Column(db.DateTime, default=datetime.utcnow)
sent_at = db.Column(db.DateTime, nullable=True)
template_id = db.Column(db.Integer, db.ForeignKey('email_templates.id'), nullable=True)
notif_id = db.Column(db.Integer, db.ForeignKey('notifs.id'), nullable=True)
# Relationships
template = db.relationship('EmailTemplate', backref='mails')
notif = db.relationship('Notif', backref='mails')
def __repr__(self):
return f'<Mail to {self.recipient} status={self.status}>'
class PasswordSetupToken(db.Model):
__tablename__ = 'password_setup_tokens'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
token = db.Column(db.String(100), unique=True, nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
expires_at = db.Column(db.DateTime, nullable=False)
used = db.Column(db.Boolean, default=False)
# Relationships
user = db.relationship('User', backref=db.backref('password_setup_tokens', cascade='all, delete-orphan'))
def is_valid(self):
return not self.used and datetime.utcnow() < self.expires_at
def __repr__(self):
return f'<PasswordSetupToken {self.token}>'
class PasswordResetToken(db.Model):
__tablename__ = 'password_reset_tokens'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
token = db.Column(db.String(100), unique=True, nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
expires_at = db.Column(db.DateTime, nullable=False)
used = db.Column(db.Boolean, default=False)
ip_address = db.Column(db.String(45)) # Store IP address for security
# Relationships
user = db.relationship('User', backref=db.backref('password_reset_tokens', cascade='all, delete-orphan'))
def is_valid(self):
return not self.used and datetime.utcnow() < self.expires_at
def __repr__(self):
return f'<PasswordResetToken {self.token}>'
def user_has_permission(room, perm_name):
"""
Check if the current user has a specific permission in a room.
Args:
room: Room object
perm_name: Name of the permission to check (e.g., 'can_view', 'can_upload')
Returns:
bool: True if user has permission, False otherwise
"""
# Admin and manager users have all permissions
if current_user.is_admin or current_user.is_manager:
return True
# Check if user is a member of the room
if current_user not in room.members:
return False
# Get user's permissions for this room
permission = RoomMemberPermission.query.filter_by(
room_id=room.id,
user_id=current_user.id
).first()
# If no specific permissions are set, user only has view access
if not permission:
return perm_name == 'can_view'
# Check the specific permission
return getattr(permission, perm_name, False)
class ManagementAPIKey(db.Model):
__tablename__ = 'management_api_keys'
id = db.Column(db.Integer, primary_key=True)
api_key = db.Column(db.String(100), unique=True, nullable=False)
name = db.Column(db.String(100), nullable=False) # Name/description of the management tool
created_at = db.Column(db.DateTime, default=datetime.utcnow)
last_used_at = db.Column(db.DateTime)
is_active = db.Column(db.Boolean, default=True)
created_by = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='SET NULL'))
# Relationships
creator = db.relationship('User', backref=db.backref('created_api_keys', cascade='all, delete-orphan'))
def __repr__(self):
return f'<ManagementAPIKey {self.name}>'
class Instance(db.Model):
__tablename__ = 'instances'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), unique=True, nullable=False)
company = db.Column(db.String(100), nullable=False)
rooms_count = db.Column(db.Integer, nullable=False, default=0)
conversations_count = db.Column(db.Integer, nullable=False, default=0)
data_size = db.Column(db.Float, nullable=False, default=0.0)
payment_plan = db.Column(db.String(20), nullable=False, default='Basic')
main_url = db.Column(db.String(255), unique=True, nullable=False)
status = db.Column(db.String(20), nullable=False, default='inactive')
status_details = db.Column(db.Text, nullable=True)
connection_token = db.Column(db.String(64), unique=True, nullable=True)
# Portainer integration fields
portainer_stack_id = db.Column(db.String(100), nullable=True) # Portainer stack ID
portainer_stack_name = db.Column(db.String(100), nullable=True) # Portainer stack name
# Version tracking fields
deployed_version = db.Column(db.String(50), nullable=True) # Current deployed version (commit hash or tag)
deployed_branch = db.Column(db.String(100), nullable=True) # Branch that was deployed
latest_version = db.Column(db.String(50), nullable=True) # Latest version available in Git
version_checked_at = db.Column(db.DateTime, nullable=True) # When version was last checked
created_at = db.Column(db.DateTime, nullable=False, server_default=db.text('CURRENT_TIMESTAMP'))
updated_at = db.Column(db.DateTime, nullable=False, server_default=db.text('CURRENT_TIMESTAMP'), onupdate=db.text('CURRENT_TIMESTAMP'))
def __repr__(self):
return f'<Instance {self.name}>'
class HelpArticle(db.Model):
__tablename__ = 'help_articles'
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
category = db.Column(db.String(50), nullable=False) # getting-started, user-management, file-management, communication, security, administration
body = db.Column(db.Text, nullable=False) # Rich text content
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
created_by = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
is_published = db.Column(db.Boolean, default=True)
order_index = db.Column(db.Integer, default=0) # For ordering articles within categories
# Relationships
creator = db.relationship('User', backref=db.backref('created_help_articles', cascade='all, delete-orphan'), foreign_keys=[created_by])
def __repr__(self):
return f'<HelpArticle {self.title} ({self.category})>'
@classmethod
def get_categories(cls):
"""Get all available categories with their display names"""
return {
'getting-started': 'Getting Started',
'user-management': 'User Management',
'file-management': 'File Management',
'communication': 'Communication',
'security': 'Security & Privacy',
'administration': 'Administration'
}
@classmethod
def get_articles_by_category(cls, category, published_only=True):
"""Get articles for a specific category"""
query = cls.query.filter_by(category=category)
if published_only:
query = query.filter_by(is_published=True)
return query.order_by(cls.order_index.asc(), cls.created_at.desc()).all()
@classmethod
def get_all_published(cls):
"""Get all published articles grouped by category"""
articles = cls.query.filter_by(is_published=True).order_by(cls.order_index.asc(), cls.created_at.desc()).all()
grouped = {}
for article in articles:
if article.category not in grouped:
grouped[article.category] = []
grouped[article.category].append(article)
return grouped
class PricingPlan(db.Model):
__tablename__ = 'pricing_plans'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
description = db.Column(db.Text)
monthly_price = db.Column(db.Float, nullable=False)
annual_price = db.Column(db.Float, nullable=False)
features = db.Column(db.JSON, nullable=False) # List of feature strings
is_popular = db.Column(db.Boolean, default=False)
is_custom = db.Column(db.Boolean, default=False)
button_text = db.Column(db.String(50), default='Get Started')
button_url = db.Column(db.String(200), default='#')
# Stripe integration fields
stripe_product_id = db.Column(db.String(100), nullable=True)
stripe_monthly_price_id = db.Column(db.String(100), nullable=True)
stripe_annual_price_id = db.Column(db.String(100), nullable=True)
# Deprecated: Stripe payment links (to be removed in a future migration)
monthly_stripe_link = db.Column(db.String(500), nullable=True) # DEPRECATED
annual_stripe_link = db.Column(db.String(500), nullable=True) # DEPRECATED
order_index = db.Column(db.Integer, default=0)
is_active = db.Column(db.Boolean, default=True)
# Quota fields
room_quota = db.Column(db.Integer, default=0) # 0 = unlimited
conversation_quota = db.Column(db.Integer, default=0) # 0 = unlimited
storage_quota_gb = db.Column(db.Integer, default=0) # 0 = unlimited, stored in GB
manager_quota = db.Column(db.Integer, default=0) # 0 = unlimited
admin_quota = db.Column(db.Integer, default=0) # 0 = unlimited
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
created_by = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='CASCADE'), nullable=False)
# Relationships
creator = db.relationship('User', backref=db.backref('created_pricing_plans', cascade='all, delete-orphan'), foreign_keys=[created_by])
def __repr__(self):
return f'<PricingPlan {self.name}>'
@classmethod
def get_active_plans(cls):
"""Get all active pricing plans ordered by order_index"""
return cls.query.filter_by(is_active=True).order_by(cls.order_index).all()
@classmethod
def get_popular_plan(cls):
"""Get the plan marked as most popular"""
return cls.query.filter_by(is_active=True, is_popular=True).first()
def get_storage_quota_bytes(self):
"""Get storage quota in bytes"""
if self.storage_quota_gb == 0:
return 0 # Unlimited
return self.storage_quota_gb * 1024 * 1024 * 1024 # Convert GB to bytes
def format_quota_display(self, quota_type):
"""Format quota for display"""
if quota_type == 'room_quota':
return 'Unlimited' if self.room_quota == 0 else f'{self.room_quota} rooms'
elif quota_type == 'conversation_quota':
return 'Unlimited' if self.conversation_quota == 0 else f'{self.conversation_quota} conversations'
elif quota_type == 'storage_quota_gb':
return 'Unlimited' if self.storage_quota_gb == 0 else f'{self.storage_quota_gb}GB'
elif quota_type == 'manager_quota':
return 'Unlimited' if self.manager_quota == 0 else f'{self.manager_quota} managers'
elif quota_type == 'admin_quota':
return 'Unlimited' if self.admin_quota == 0 else f'{self.admin_quota} admins'
return 'Unknown'
def check_quota(self, quota_type, current_count):
"""Check if a quota would be exceeded"""
if quota_type == 'room_quota':
return self.room_quota == 0 or current_count < self.room_quota
elif quota_type == 'conversation_quota':
return self.conversation_quota == 0 or current_count < self.conversation_quota
elif quota_type == 'storage_quota_gb':
return self.storage_quota_gb == 0 or current_count < self.storage_quota_gb
elif quota_type == 'manager_quota':
return self.manager_quota == 0 or current_count < self.manager_quota
elif quota_type == 'admin_quota':
return self.admin_quota == 0 or current_count < self.admin_quota
return True
def get_quota_remaining(self, quota_type, current_count):
"""Get remaining quota"""
if quota_type == 'room_quota':
return float('inf') if self.room_quota == 0 else max(0, self.room_quota - current_count)
elif quota_type == 'conversation_quota':
return float('inf') if self.conversation_quota == 0 else max(0, self.conversation_quota - current_count)
elif quota_type == 'storage_quota_gb':
return float('inf') if self.storage_quota_gb == 0 else max(0, self.storage_quota_gb - current_count)
elif quota_type == 'manager_quota':
return float('inf') if self.manager_quota == 0 else max(0, self.manager_quota - current_count)
elif quota_type == 'admin_quota':
return float('inf') if self.admin_quota == 0 else max(0, self.admin_quota - current_count)
return 0
class Customer(db.Model):
__tablename__ = 'customer'
id = db.Column(db.Integer, primary_key=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
email = db.Column(db.String(150), nullable=False, index=True)
name = db.Column(db.String(150))
phone = db.Column(db.String(50))
billing_address_line1 = db.Column(db.String(255))
billing_address_line2 = db.Column(db.String(255))
billing_city = db.Column(db.String(100))
billing_state = db.Column(db.String(100))
billing_postal_code = db.Column(db.String(20))
billing_country = db.Column(db.String(100))
shipping_address_line1 = db.Column(db.String(255))
shipping_address_line2 = db.Column(db.String(255))
shipping_city = db.Column(db.String(100))
shipping_state = db.Column(db.String(100))
shipping_postal_code = db.Column(db.String(20))
shipping_country = db.Column(db.String(100))
tax_id_type = db.Column(db.String(50))
tax_id_value = db.Column(db.String(100))
stripe_customer_id = db.Column(db.String(255))
stripe_subscription_id = db.Column(db.String(255))
subscription_status = db.Column(db.String(50))
subscription_plan_id = db.Column(db.Integer, db.ForeignKey('pricing_plans.id'))
subscription_billing_cycle = db.Column(db.String(20))
subscription_current_period_start = db.Column(db.DateTime)
subscription_current_period_end = db.Column(db.DateTime)
# Relationship to pricing plan
plan = db.relationship('PricingPlan', backref='customers')
def __repr__(self):
return f'<Customer {self.email}>'