All checks were successful
Chore App Build, Test, and Push Docker Images / build-and-push (push) Successful in 3m23s
- Introduced a dual-token system for user authentication: a short-lived access token and a long-lived rotating refresh token. - Created a new RefreshToken model to manage refresh tokens securely. - Updated auth_api.py to handle login, refresh, and logout processes with the new token system. - Enhanced security measures including token rotation and theft detection. - Updated frontend to handle token refresh on 401 errors and adjusted SSE authentication. - Removed CORS middleware as it's unnecessary behind the nginx proxy. - Added tests to ensure functionality and security of the new token system.
97 lines
3.1 KiB
Python
97 lines
3.1 KiB
Python
import jwt
|
|
import re
|
|
from functools import wraps
|
|
from db.db import users_db
|
|
from tinydb import Query
|
|
from flask import request, current_app, jsonify
|
|
|
|
from events.sse import send_event_to_user
|
|
from models.user import User
|
|
|
|
|
|
def normalize_email(email: str) -> str:
|
|
"""Normalize email for uniqueness checks (Gmail: remove dots and +aliases)."""
|
|
email = email.strip().lower()
|
|
if '@' not in email:
|
|
return email
|
|
local, domain = email.split('@', 1)
|
|
if domain in ('gmail.com', 'googlemail.com'):
|
|
local = local.split('+', 1)[0].replace('.', '')
|
|
return f"{local}@{domain}"
|
|
|
|
def sanitize_email(email):
|
|
return email.replace('@', '_at_').replace('.', '_dot_')
|
|
|
|
def get_current_user_id():
|
|
token = request.cookies.get('access_token')
|
|
if not token:
|
|
return None
|
|
try:
|
|
payload = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=['HS256'])
|
|
user_id = payload.get('user_id')
|
|
if not user_id:
|
|
return None
|
|
token_version = payload.get('token_version', 0)
|
|
user = users_db.get(Query().id == user_id)
|
|
if not user:
|
|
return None
|
|
if token_version != user.get('token_version', 0):
|
|
return None
|
|
return user_id
|
|
except jwt.InvalidTokenError:
|
|
return None
|
|
|
|
def get_validated_user_id():
|
|
user_id = get_current_user_id()
|
|
if not user_id or not users_db.get(Query().id == user_id):
|
|
return None
|
|
return user_id
|
|
|
|
def send_event_for_current_user(event):
|
|
user_id = get_current_user_id()
|
|
if not user_id:
|
|
return jsonify({'error': 'Unauthorized'}), 401
|
|
send_event_to_user(user_id, event)
|
|
return None
|
|
|
|
|
|
def admin_required(f):
|
|
"""
|
|
Decorator to require admin role for endpoints.
|
|
Validates JWT from access_token cookie and checks admin role.
|
|
"""
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
token = request.cookies.get('access_token')
|
|
if not token:
|
|
return jsonify({'error': 'Authentication required', 'code': 'AUTH_REQUIRED'}), 401
|
|
|
|
try:
|
|
payload = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=['HS256'])
|
|
user_id = payload.get('user_id')
|
|
|
|
if not user_id:
|
|
return jsonify({'error': 'Invalid token', 'code': 'INVALID_TOKEN'}), 401
|
|
|
|
user_dict = users_db.get(Query().id == user_id)
|
|
|
|
if not user_dict:
|
|
return jsonify({'error': 'User not found', 'code': 'USER_NOT_FOUND'}), 404
|
|
|
|
user = User.from_dict(user_dict)
|
|
|
|
if user.role != 'admin':
|
|
return jsonify({'error': 'Admin access required', 'code': 'ADMIN_REQUIRED'}), 403
|
|
|
|
# Store user info in request context for the endpoint
|
|
request.current_user = user
|
|
request.admin_user_id = user_id
|
|
|
|
except jwt.ExpiredSignatureError:
|
|
return jsonify({'error': 'Token expired', 'code': 'TOKEN_EXPIRED'}), 401
|
|
except jwt.InvalidTokenError:
|
|
return jsonify({'error': 'Invalid token', 'code': 'INVALID_TOKEN'}), 401
|
|
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated_function |