Files
chore/backend/api/utils.py
Ryan Kegel ebaef16daf
All checks were successful
Chore App Build, Test, and Push Docker Images / build-and-push (push) Successful in 3m23s
feat: implement long-term user login with refresh tokens
- Introduced a dual-token system for user authentication: a short-lived access token and a long-lived rotating refresh token.
- Created a new RefreshToken model to manage refresh tokens securely.
- Updated auth_api.py to handle login, refresh, and logout processes with the new token system.
- Enhanced security measures including token rotation and theft detection.
- Updated frontend to handle token refresh on 401 errors and adjusted SSE authentication.
- Removed CORS middleware as it's unnecessary behind the nginx proxy.
- Added tests to ensure functionality and security of the new token system.
2026-03-01 19:27:25 -05:00

97 lines
3.1 KiB
Python

import jwt
import re
from functools import wraps
from db.db import users_db
from tinydb import Query
from flask import request, current_app, jsonify
from events.sse import send_event_to_user
from models.user import User
def normalize_email(email: str) -> str:
"""Normalize email for uniqueness checks (Gmail: remove dots and +aliases)."""
email = email.strip().lower()
if '@' not in email:
return email
local, domain = email.split('@', 1)
if domain in ('gmail.com', 'googlemail.com'):
local = local.split('+', 1)[0].replace('.', '')
return f"{local}@{domain}"
def sanitize_email(email):
return email.replace('@', '_at_').replace('.', '_dot_')
def get_current_user_id():
token = request.cookies.get('access_token')
if not token:
return None
try:
payload = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=['HS256'])
user_id = payload.get('user_id')
if not user_id:
return None
token_version = payload.get('token_version', 0)
user = users_db.get(Query().id == user_id)
if not user:
return None
if token_version != user.get('token_version', 0):
return None
return user_id
except jwt.InvalidTokenError:
return None
def get_validated_user_id():
user_id = get_current_user_id()
if not user_id or not users_db.get(Query().id == user_id):
return None
return user_id
def send_event_for_current_user(event):
user_id = get_current_user_id()
if not user_id:
return jsonify({'error': 'Unauthorized'}), 401
send_event_to_user(user_id, event)
return None
def admin_required(f):
"""
Decorator to require admin role for endpoints.
Validates JWT from access_token cookie and checks admin role.
"""
@wraps(f)
def decorated_function(*args, **kwargs):
token = request.cookies.get('access_token')
if not token:
return jsonify({'error': 'Authentication required', 'code': 'AUTH_REQUIRED'}), 401
try:
payload = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=['HS256'])
user_id = payload.get('user_id')
if not user_id:
return jsonify({'error': 'Invalid token', 'code': 'INVALID_TOKEN'}), 401
user_dict = users_db.get(Query().id == user_id)
if not user_dict:
return jsonify({'error': 'User not found', 'code': 'USER_NOT_FOUND'}), 404
user = User.from_dict(user_dict)
if user.role != 'admin':
return jsonify({'error': 'Admin access required', 'code': 'ADMIN_REQUIRED'}), 403
# Store user info in request context for the endpoint
request.current_user = user
request.admin_user_id = user_id
except jwt.ExpiredSignatureError:
return jsonify({'error': 'Token expired', 'code': 'TOKEN_EXPIRED'}), 401
except jwt.InvalidTokenError:
return jsonify({'error': 'Invalid token', 'code': 'INVALID_TOKEN'}), 401
return f(*args, **kwargs)
return decorated_function