444 lines
15 KiB
Python
444 lines
15 KiB
Python
"""
|
|
Stripe utility functions for managing products, prices, and checkout sessions.
|
|
"""
|
|
import stripe
|
|
import os
|
|
from models import KeyValueSettings, PricingPlan
|
|
from flask import current_app, url_for
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def get_stripe_settings():
|
|
"""Get Stripe settings from database"""
|
|
stripe_settings = KeyValueSettings.get_value('stripe_settings')
|
|
if not stripe_settings:
|
|
return None
|
|
return stripe_settings
|
|
|
|
def configure_stripe():
|
|
"""Configure Stripe with API key from settings"""
|
|
stripe_settings = get_stripe_settings()
|
|
if not stripe_settings or not stripe_settings.get('secret_key'):
|
|
raise ValueError("Stripe secret key not configured")
|
|
|
|
stripe.api_key = stripe_settings['secret_key']
|
|
return stripe_settings
|
|
|
|
def create_stripe_product(plan):
|
|
"""
|
|
Create a Stripe product and prices for a pricing plan
|
|
|
|
Args:
|
|
plan: PricingPlan instance
|
|
|
|
Returns:
|
|
dict: Contains product_id, monthly_price_id, annual_price_id
|
|
"""
|
|
try:
|
|
configure_stripe()
|
|
|
|
# Create product
|
|
product = stripe.Product.create(
|
|
name=plan.name,
|
|
description=plan.description or f"DocuPulse {plan.name} Plan",
|
|
metadata={
|
|
'plan_id': plan.id,
|
|
'plan_name': plan.name
|
|
}
|
|
)
|
|
|
|
# Create monthly price
|
|
monthly_price = stripe.Price.create(
|
|
product=product.id,
|
|
unit_amount=int(plan.monthly_price * 100), # Convert to cents
|
|
currency='eur',
|
|
recurring={'interval': 'month'},
|
|
metadata={
|
|
'plan_id': plan.id,
|
|
'billing_cycle': 'monthly',
|
|
'plan_name': plan.name
|
|
}
|
|
)
|
|
|
|
# Create annual price
|
|
annual_price = stripe.Price.create(
|
|
product=product.id,
|
|
unit_amount=int(plan.annual_price * 100), # Convert to cents
|
|
currency='eur',
|
|
recurring={'interval': 'year'},
|
|
metadata={
|
|
'plan_id': plan.id,
|
|
'billing_cycle': 'annual',
|
|
'plan_name': plan.name
|
|
}
|
|
)
|
|
|
|
return {
|
|
'product_id': product.id,
|
|
'monthly_price_id': monthly_price.id,
|
|
'annual_price_id': annual_price.id
|
|
}
|
|
|
|
except stripe.error.StripeError as e:
|
|
logger.error(f"Stripe error creating product for plan {plan.name}: {str(e)}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error creating Stripe product for plan {plan.name}: {str(e)}")
|
|
raise
|
|
|
|
def update_stripe_product(plan):
|
|
"""
|
|
Update an existing Stripe product and prices for a pricing plan
|
|
|
|
Args:
|
|
plan: PricingPlan instance with existing Stripe IDs
|
|
|
|
Returns:
|
|
dict: Updated product and price information
|
|
"""
|
|
try:
|
|
configure_stripe()
|
|
|
|
if not plan.stripe_product_id:
|
|
# If no product ID exists, create new product
|
|
return create_stripe_product(plan)
|
|
|
|
# Update product
|
|
product = stripe.Product.modify(
|
|
plan.stripe_product_id,
|
|
name=plan.name,
|
|
description=plan.description or f"DocuPulse {plan.name} Plan",
|
|
metadata={
|
|
'plan_id': plan.id,
|
|
'plan_name': plan.name
|
|
}
|
|
)
|
|
|
|
# Archive old prices and create new ones
|
|
new_prices = {}
|
|
|
|
# Handle monthly price
|
|
if plan.stripe_monthly_price_id:
|
|
try:
|
|
# Archive old monthly price
|
|
stripe.Price.modify(plan.stripe_monthly_price_id, active=False)
|
|
except stripe.error.StripeError:
|
|
pass # Price might not exist
|
|
|
|
# Create new monthly price
|
|
monthly_price = stripe.Price.create(
|
|
product=product.id,
|
|
unit_amount=int(plan.monthly_price * 100),
|
|
currency='eur',
|
|
recurring={'interval': 'month'},
|
|
metadata={
|
|
'plan_id': plan.id,
|
|
'billing_cycle': 'monthly',
|
|
'plan_name': plan.name
|
|
}
|
|
)
|
|
new_prices['monthly_price_id'] = monthly_price.id
|
|
|
|
# Handle annual price
|
|
if plan.stripe_annual_price_id:
|
|
try:
|
|
# Archive old annual price
|
|
stripe.Price.modify(plan.stripe_annual_price_id, active=False)
|
|
except stripe.error.StripeError:
|
|
pass # Price might not exist
|
|
|
|
# Create new annual price
|
|
annual_price = stripe.Price.create(
|
|
product=product.id,
|
|
unit_amount=int(plan.annual_price * 100),
|
|
currency='eur',
|
|
recurring={'interval': 'year'},
|
|
metadata={
|
|
'plan_id': plan.id,
|
|
'billing_cycle': 'annual',
|
|
'plan_name': plan.name
|
|
}
|
|
)
|
|
new_prices['annual_price_id'] = annual_price.id
|
|
|
|
return {
|
|
'product_id': product.id,
|
|
'monthly_price_id': new_prices['monthly_price_id'],
|
|
'annual_price_id': new_prices['annual_price_id']
|
|
}
|
|
|
|
except stripe.error.StripeError as e:
|
|
logger.error(f"Stripe error updating product for plan {plan.name}: {str(e)}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error updating Stripe product for plan {plan.name}: {str(e)}")
|
|
raise
|
|
|
|
def create_checkout_session(plan_id, billing_cycle='monthly', success_url=None, cancel_url=None):
|
|
"""
|
|
Create a Stripe checkout session for a pricing plan
|
|
|
|
Args:
|
|
plan_id: ID of the PricingPlan
|
|
billing_cycle: 'monthly' or 'annual'
|
|
success_url: URL to redirect to on successful payment
|
|
cancel_url: URL to redirect to on cancellation
|
|
|
|
Returns:
|
|
str: Checkout session URL
|
|
"""
|
|
logger.info(f"=== CREATE CHECKOUT SESSION START ===")
|
|
logger.info(f"Plan ID: {plan_id}")
|
|
logger.info(f"Billing cycle: {billing_cycle}")
|
|
logger.info(f"Success URL: {success_url}")
|
|
logger.info(f"Cancel URL: {cancel_url}")
|
|
|
|
try:
|
|
configure_stripe()
|
|
logger.info("Stripe configured successfully")
|
|
|
|
plan = PricingPlan.query.get(plan_id)
|
|
logger.info(f"Plan lookup result: {plan}")
|
|
|
|
if not plan:
|
|
logger.error(f"Pricing plan with ID {plan_id} not found")
|
|
raise ValueError(f"Pricing plan with ID {plan_id} not found")
|
|
|
|
logger.info(f"Plan found: {plan.name}")
|
|
logger.info(f"Plan stripe_monthly_price_id: {plan.stripe_monthly_price_id}")
|
|
logger.info(f"Plan stripe_annual_price_id: {plan.stripe_annual_price_id}")
|
|
|
|
# Determine which price ID to use
|
|
if billing_cycle == 'monthly':
|
|
price_id = plan.stripe_monthly_price_id
|
|
if not price_id:
|
|
logger.error("Monthly price not configured for this plan")
|
|
raise ValueError("Monthly price not configured for this plan")
|
|
elif billing_cycle == 'annual':
|
|
price_id = plan.stripe_annual_price_id
|
|
if not price_id:
|
|
logger.error("Annual price not configured for this plan")
|
|
raise ValueError("Annual price not configured for this plan")
|
|
else:
|
|
logger.error(f"Invalid billing cycle: {billing_cycle}")
|
|
raise ValueError("Invalid billing cycle. Must be 'monthly' or 'annual'")
|
|
|
|
logger.info(f"Using price ID: {price_id}")
|
|
|
|
# Set default URLs if not provided
|
|
if not success_url:
|
|
success_url = url_for('main.dashboard', _external=True)
|
|
if not cancel_url:
|
|
cancel_url = url_for('main.public_home', _external=True)
|
|
|
|
logger.info(f"Final success URL: {success_url}")
|
|
logger.info(f"Final cancel URL: {cancel_url}")
|
|
|
|
# Create checkout session
|
|
session_data = {
|
|
'payment_method_types': ['card'],
|
|
'line_items': [{
|
|
'price': price_id,
|
|
'quantity': 1,
|
|
}],
|
|
'mode': 'subscription',
|
|
'success_url': f"{success_url}?session_id={{CHECKOUT_SESSION_ID}}",
|
|
'cancel_url': cancel_url,
|
|
'metadata': {
|
|
'plan_id': plan_id,
|
|
'plan_name': plan.name,
|
|
'billing_cycle': billing_cycle
|
|
},
|
|
'customer_email': None, # Will be collected during checkout
|
|
'allow_promotion_codes': True,
|
|
'billing_address_collection': 'required',
|
|
'phone_number_collection': {
|
|
'enabled': True
|
|
},
|
|
'automatic_tax': {
|
|
'enabled': True
|
|
},
|
|
'tax_id_collection': {
|
|
'enabled': True
|
|
}
|
|
}
|
|
|
|
logger.info(f"Creating Stripe session with data: {session_data}")
|
|
|
|
session = stripe.checkout.Session.create(**session_data)
|
|
|
|
logger.info(f"Stripe session created successfully: {session.id}")
|
|
logger.info(f"Session URL: {session.url}")
|
|
|
|
return session.url
|
|
|
|
except stripe.error.StripeError as e:
|
|
logger.error(f"Stripe error creating checkout session: {str(e)}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error creating checkout session: {str(e)}")
|
|
raise
|
|
finally:
|
|
logger.info("=== CREATE CHECKOUT SESSION END ===")
|
|
|
|
def get_subscription_info(session_id):
|
|
"""
|
|
Get subscription information from a checkout session
|
|
|
|
Args:
|
|
session_id: Stripe checkout session ID
|
|
|
|
Returns:
|
|
dict: Subscription information
|
|
"""
|
|
try:
|
|
configure_stripe()
|
|
|
|
session = stripe.checkout.Session.retrieve(session_id)
|
|
|
|
if session.payment_status == 'paid':
|
|
subscription = stripe.Subscription.retrieve(session.subscription)
|
|
|
|
# Get customer details
|
|
customer_details = {}
|
|
if session.customer_details:
|
|
customer_details = {
|
|
'name': session.customer_details.name,
|
|
'email': session.customer_details.email,
|
|
'phone': session.customer_details.phone,
|
|
'address': {
|
|
'line1': session.customer_details.address.line1,
|
|
'line2': session.customer_details.address.line2,
|
|
'city': session.customer_details.address.city,
|
|
'state': session.customer_details.address.state,
|
|
'postal_code': session.customer_details.address.postal_code,
|
|
'country': session.customer_details.address.country
|
|
} if session.customer_details.address else None,
|
|
'shipping': {
|
|
'name': session.customer_details.shipping.name,
|
|
'address': {
|
|
'line1': session.customer_details.shipping.address.line1,
|
|
'line2': session.customer_details.shipping.address.line2,
|
|
'city': session.customer_details.shipping.address.city,
|
|
'state': session.customer_details.shipping.address.state,
|
|
'postal_code': session.customer_details.shipping.address.postal_code,
|
|
'country': session.customer_details.shipping.address.country
|
|
}
|
|
} if session.customer_details.shipping else None,
|
|
'tax_ids': [
|
|
{
|
|
'type': tax_id.type,
|
|
'value': tax_id.value
|
|
} for tax_id in session.customer_details.tax_ids
|
|
] if session.customer_details.tax_ids else []
|
|
}
|
|
|
|
return {
|
|
'session_id': session_id,
|
|
'subscription_id': subscription.id,
|
|
'customer_id': subscription.customer,
|
|
'status': subscription.status,
|
|
'plan_id': session.metadata.get('plan_id'),
|
|
'plan_name': session.metadata.get('plan_name'),
|
|
'billing_cycle': session.metadata.get('billing_cycle'),
|
|
'current_period_start': subscription.current_period_start,
|
|
'current_period_end': subscription.current_period_end,
|
|
'amount': subscription.items.data[0].price.unit_amount / 100, # Convert from cents
|
|
'currency': subscription.currency,
|
|
'customer_details': customer_details
|
|
}
|
|
else:
|
|
return {
|
|
'session_id': session_id,
|
|
'payment_status': session.payment_status,
|
|
'error': 'Payment not completed'
|
|
}
|
|
|
|
except stripe.error.StripeError as e:
|
|
logger.error(f"Stripe error getting subscription info: {str(e)}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error getting subscription info: {str(e)}")
|
|
raise
|
|
|
|
def cancel_subscription(subscription_id):
|
|
"""
|
|
Cancel a Stripe subscription
|
|
|
|
Args:
|
|
subscription_id: Stripe subscription ID
|
|
|
|
Returns:
|
|
dict: Cancellation information
|
|
"""
|
|
try:
|
|
configure_stripe()
|
|
|
|
subscription = stripe.Subscription.modify(
|
|
subscription_id,
|
|
cancel_at_period_end=True
|
|
)
|
|
|
|
return {
|
|
'subscription_id': subscription_id,
|
|
'status': subscription.status,
|
|
'cancel_at_period_end': subscription.cancel_at_period_end,
|
|
'current_period_end': subscription.current_period_end
|
|
}
|
|
|
|
except stripe.error.StripeError as e:
|
|
logger.error(f"Stripe error canceling subscription: {str(e)}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error canceling subscription: {str(e)}")
|
|
raise
|
|
|
|
def validate_stripe_keys():
|
|
"""
|
|
Validate that Stripe keys are properly configured
|
|
|
|
Returns:
|
|
dict: Validation result with status and message
|
|
"""
|
|
try:
|
|
stripe_settings = get_stripe_settings()
|
|
if not stripe_settings:
|
|
return {
|
|
'valid': False,
|
|
'message': 'Stripe settings not configured'
|
|
}
|
|
|
|
if not stripe_settings.get('secret_key'):
|
|
return {
|
|
'valid': False,
|
|
'message': 'Stripe secret key not configured'
|
|
}
|
|
|
|
if not stripe_settings.get('publishable_key'):
|
|
return {
|
|
'valid': False,
|
|
'message': 'Stripe publishable key not configured'
|
|
}
|
|
|
|
# Test the API key
|
|
configure_stripe()
|
|
account = stripe.Account.retrieve()
|
|
|
|
return {
|
|
'valid': True,
|
|
'message': 'Stripe configuration is valid',
|
|
'account_id': account.id,
|
|
'test_mode': stripe_settings.get('test_mode', False)
|
|
}
|
|
|
|
except stripe.error.AuthenticationError:
|
|
return {
|
|
'valid': False,
|
|
'message': 'Invalid Stripe API key'
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'valid': False,
|
|
'message': f'Error validating Stripe configuration: {str(e)}'
|
|
} |