import jwt import re from functools import wraps from db.db import users_db from tinydb import Query from flask import request, current_app, jsonify from events.sse import send_event_to_user from models.user import User def normalize_email(email: str) -> str: """Normalize email for uniqueness checks (Gmail: remove dots and +aliases).""" email = email.strip().lower() if '@' not in email: return email local, domain = email.split('@', 1) if domain in ('gmail.com', 'googlemail.com'): local = local.split('+', 1)[0].replace('.', '') return f"{local}@{domain}" def sanitize_email(email): return email.replace('@', '_at_').replace('.', '_dot_') def get_current_user_id(): token = request.cookies.get('access_token') if not token: return None try: payload = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=['HS256']) user_id = payload.get('user_id') if not user_id: return None token_version = payload.get('token_version', 0) user = users_db.get(Query().id == user_id) if not user: return None if token_version != user.get('token_version', 0): return None return user_id except jwt.InvalidTokenError: return None def get_validated_user_id(): user_id = get_current_user_id() if not user_id or not users_db.get(Query().id == user_id): return None return user_id def send_event_for_current_user(event): user_id = get_current_user_id() if not user_id: return jsonify({'error': 'Unauthorized'}), 401 send_event_to_user(user_id, event) return None def admin_required(f): """ Decorator to require admin role for endpoints. Validates JWT from access_token cookie and checks admin role. """ @wraps(f) def decorated_function(*args, **kwargs): token = request.cookies.get('access_token') if not token: return jsonify({'error': 'Authentication required', 'code': 'AUTH_REQUIRED'}), 401 try: payload = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=['HS256']) user_id = payload.get('user_id') if not user_id: return jsonify({'error': 'Invalid token', 'code': 'INVALID_TOKEN'}), 401 user_dict = users_db.get(Query().id == user_id) if not user_dict: return jsonify({'error': 'User not found', 'code': 'USER_NOT_FOUND'}), 404 user = User.from_dict(user_dict) if user.role != 'admin': return jsonify({'error': 'Admin access required', 'code': 'ADMIN_REQUIRED'}), 403 # Store user info in request context for the endpoint request.current_user = user request.admin_user_id = user_id except jwt.ExpiredSignatureError: return jsonify({'error': 'Token expired', 'code': 'TOKEN_EXPIRED'}), 401 except jwt.InvalidTokenError: return jsonify({'error': 'Invalid token', 'code': 'INVALID_TOKEN'}), 401 return f(*args, **kwargs) return decorated_function