All checks were successful
Chore App Build, Test, and Push Docker Images / build-and-push (push) Successful in 2m29s
463 lines
18 KiB
Python
463 lines
18 KiB
Python
import hashlib
|
|
import logging
|
|
import secrets
|
|
import uuid
|
|
import jwt
|
|
from datetime import datetime, timedelta, timezone
|
|
from models.user import User
|
|
from models.refresh_token import RefreshToken
|
|
from flask import Blueprint, request, jsonify, current_app
|
|
from tinydb import Query
|
|
import os
|
|
import utils.email_sender as email_sender
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
|
|
from api.utils import sanitize_email
|
|
from config.paths import get_user_image_dir
|
|
from events.sse import send_event_to_user
|
|
from events.types.event import Event
|
|
from events.types.event_types import EventType
|
|
from events.types.payload import Payload
|
|
|
|
from api.error_codes import (
|
|
MISSING_FIELDS, EMAIL_EXISTS, MISSING_TOKEN, INVALID_TOKEN, TOKEN_TIMESTAMP_MISSING,
|
|
TOKEN_EXPIRED, ALREADY_VERIFIED, MISSING_EMAIL, USER_NOT_FOUND, MISSING_EMAIL_OR_PASSWORD,
|
|
INVALID_CREDENTIALS, NOT_VERIFIED, ACCOUNT_MARKED_FOR_DELETION,
|
|
REFRESH_TOKEN_REUSE, REFRESH_TOKEN_EXPIRED, MISSING_REFRESH_TOKEN,
|
|
)
|
|
from db.db import users_db, refresh_tokens_db
|
|
from api.utils import normalize_email
|
|
|
|
logger = logging.getLogger(__name__)
|
|
auth_api = Blueprint('auth_api', __name__)
|
|
UserQuery = Query()
|
|
TokenQuery = Query()
|
|
TOKEN_EXPIRY_MINUTES = 60 * 4
|
|
RESET_PASSWORD_TOKEN_EXPIRY_MINUTES = 10
|
|
ACCESS_TOKEN_EXPIRY_MINUTES = 15
|
|
|
|
|
|
def send_verification_email(to_email, token):
|
|
email_sender.send_verification_email(to_email, token)
|
|
|
|
def send_reset_password_email(to_email, token):
|
|
email_sender.send_reset_password_email(to_email, token)
|
|
|
|
|
|
def _hash_token(raw_token: str) -> str:
|
|
"""SHA-256 hash a raw refresh token for secure storage."""
|
|
return hashlib.sha256(raw_token.encode('utf-8')).hexdigest()
|
|
|
|
|
|
def _create_access_token(user: User) -> str:
|
|
"""Create a short-lived JWT access token."""
|
|
payload = {
|
|
'email': user.email,
|
|
'user_id': user.id,
|
|
'token_version': user.token_version,
|
|
'exp': datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRY_MINUTES),
|
|
}
|
|
return jwt.encode(payload, current_app.config['SECRET_KEY'], algorithm='HS256')
|
|
|
|
|
|
def _create_refresh_token(user_id: str, token_family: str | None = None) -> tuple[str, RefreshToken]:
|
|
"""
|
|
Create a refresh token: returns (raw_token, RefreshToken record).
|
|
If token_family is None, a new family is created (login).
|
|
Otherwise, the existing family is reused (rotation).
|
|
"""
|
|
raw_token = secrets.token_urlsafe(32)
|
|
expiry_days = current_app.config['REFRESH_TOKEN_EXPIRY_DAYS']
|
|
expires_at = (datetime.now(timezone.utc) + timedelta(days=expiry_days)).isoformat()
|
|
family = token_family or str(uuid.uuid4())
|
|
|
|
record = RefreshToken(
|
|
user_id=user_id,
|
|
token_hash=_hash_token(raw_token),
|
|
token_family=family,
|
|
expires_at=expires_at,
|
|
is_used=False,
|
|
)
|
|
refresh_tokens_db.insert(record.to_dict())
|
|
return raw_token, record
|
|
|
|
|
|
def _set_auth_cookies(resp, access_token: str, raw_refresh_token: str):
|
|
"""Set both access and refresh token cookies on a response."""
|
|
expiry_days = current_app.config['REFRESH_TOKEN_EXPIRY_DAYS']
|
|
resp.set_cookie('access_token', access_token, httponly=True, secure=True, samesite='Strict')
|
|
resp.set_cookie(
|
|
'refresh_token', raw_refresh_token,
|
|
httponly=True, secure=True, samesite='Strict',
|
|
max_age=expiry_days * 24 * 3600,
|
|
path='/api/auth',
|
|
)
|
|
|
|
|
|
def _clear_auth_cookies(resp):
|
|
"""Clear both access and refresh token cookies."""
|
|
resp.set_cookie('access_token', '', expires=0, httponly=True, secure=True, samesite='Strict')
|
|
resp.set_cookie('refresh_token', '', expires=0, httponly=True, secure=True, samesite='Strict', path='/api/auth')
|
|
|
|
|
|
def _purge_expired_tokens(user_id: str):
|
|
"""Remove expired refresh tokens for a user to prevent unbounded DB growth."""
|
|
now = datetime.now(timezone.utc)
|
|
all_tokens = refresh_tokens_db.search(TokenQuery.user_id == user_id)
|
|
for t in all_tokens:
|
|
try:
|
|
exp = datetime.fromisoformat(t['expires_at'])
|
|
if exp.tzinfo is None:
|
|
exp = exp.replace(tzinfo=timezone.utc)
|
|
if now > exp:
|
|
refresh_tokens_db.remove(TokenQuery.id == t['id'])
|
|
except (ValueError, KeyError):
|
|
refresh_tokens_db.remove(TokenQuery.id == t['id'])
|
|
|
|
@auth_api.route('/signup', methods=['POST'])
|
|
def signup():
|
|
data = request.get_json()
|
|
required_fields = ['first_name', 'last_name', 'email', 'password']
|
|
if not all(field in data for field in required_fields):
|
|
return jsonify({'error': 'Missing required fields', 'code': MISSING_FIELDS}), 400
|
|
email = data.get('email', '')
|
|
norm_email = normalize_email(email)
|
|
|
|
existing = users_db.get(UserQuery.email == norm_email)
|
|
if existing:
|
|
user = User.from_dict(existing)
|
|
if user.marked_for_deletion:
|
|
return jsonify({'error': 'Account marked for deletion', 'code': ACCOUNT_MARKED_FOR_DELETION}), 403
|
|
return jsonify({'error': 'Email already exists', 'code': EMAIL_EXISTS}), 400
|
|
|
|
token = secrets.token_urlsafe(32)
|
|
now_iso = datetime.utcnow().isoformat()
|
|
user = User(
|
|
first_name=data['first_name'],
|
|
last_name=data['last_name'],
|
|
email=norm_email,
|
|
password=generate_password_hash(data['password']),
|
|
verified=False,
|
|
verify_token=token,
|
|
verify_token_created=now_iso,
|
|
image_id="boy01"
|
|
)
|
|
users_db.insert(user.to_dict())
|
|
send_verification_email(norm_email, token)
|
|
return jsonify({'message': 'User created, verification email sent'}), 201
|
|
|
|
@auth_api.route('/verify', methods=['GET'])
|
|
def verify():
|
|
token = request.args.get('token')
|
|
status = 'success'
|
|
reason = ''
|
|
code = ''
|
|
user_dict = None
|
|
user = None
|
|
|
|
if not token:
|
|
status = 'error'
|
|
reason = 'Missing token'
|
|
code = MISSING_TOKEN
|
|
else:
|
|
user_dict = users_db.get(Query().verify_token == token)
|
|
user = User.from_dict(user_dict) if user_dict else None
|
|
if not user:
|
|
status = 'error'
|
|
reason = 'Invalid token'
|
|
code = INVALID_TOKEN
|
|
elif user.marked_for_deletion:
|
|
status = 'error'
|
|
reason = 'Account marked for deletion'
|
|
code = ACCOUNT_MARKED_FOR_DELETION
|
|
else:
|
|
created_str = user.verify_token_created
|
|
if not created_str:
|
|
status = 'error'
|
|
reason = 'Token timestamp missing'
|
|
code = TOKEN_TIMESTAMP_MISSING
|
|
else:
|
|
created_dt = datetime.fromisoformat(created_str).replace(tzinfo=timezone.utc)
|
|
if datetime.now(timezone.utc) - created_dt > timedelta(minutes=TOKEN_EXPIRY_MINUTES):
|
|
status = 'error'
|
|
reason = 'Token expired'
|
|
code = TOKEN_EXPIRED
|
|
else:
|
|
user.verified = True
|
|
user.verify_token = None
|
|
user.verify_token_created = None
|
|
users_db.update(user.to_dict(), Query().email == user.email)
|
|
|
|
http_status = 200 if status == 'success' else 400
|
|
if http_status == 200 and user is not None:
|
|
if not user.email:
|
|
logger.error("Verified user has no email field.")
|
|
else:
|
|
user_image_dir = get_user_image_dir(user.id)
|
|
os.makedirs(user_image_dir, exist_ok=True)
|
|
|
|
return jsonify({'status': status, 'reason': reason, 'code': code}), http_status
|
|
|
|
@auth_api.route('/resend-verify', methods=['POST'])
|
|
def resend_verify():
|
|
data = request.get_json()
|
|
email = data.get('email', '')
|
|
if not email:
|
|
return jsonify({'error': 'Missing email', 'code': MISSING_EMAIL}), 400
|
|
norm_email = normalize_email(email)
|
|
|
|
user_dict = users_db.get(UserQuery.email == norm_email)
|
|
user = User.from_dict(user_dict) if user_dict else None
|
|
if not user:
|
|
return jsonify({'error': 'User not found', 'code': USER_NOT_FOUND}), 404
|
|
|
|
if user.verified:
|
|
return jsonify({'error': 'Account already verified', 'code': ALREADY_VERIFIED}), 400
|
|
|
|
token = secrets.token_urlsafe(32)
|
|
now_iso = datetime.utcnow().isoformat()
|
|
user.verify_token = token
|
|
user.verify_token_created = now_iso
|
|
users_db.update(user.to_dict(), UserQuery.email == norm_email)
|
|
send_verification_email(norm_email, token)
|
|
return jsonify({'message': 'Verification email resent'}), 200
|
|
|
|
@auth_api.route('/login', methods=['POST'])
|
|
def login():
|
|
data = request.get_json()
|
|
email = data.get('email', '')
|
|
password = data.get('password')
|
|
if not email or not password:
|
|
return jsonify({'error': 'Missing email or password', 'code': MISSING_EMAIL_OR_PASSWORD}), 400
|
|
norm_email = normalize_email(email)
|
|
|
|
user_dict = users_db.get(UserQuery.email == norm_email)
|
|
user = User.from_dict(user_dict) if user_dict else None
|
|
if not user or not check_password_hash(user.password, password):
|
|
return jsonify({'error': 'Invalid credentials', 'code': INVALID_CREDENTIALS}), 401
|
|
|
|
if not user.verified:
|
|
return jsonify({'error': 'This account has not verified', 'code': NOT_VERIFIED}), 403
|
|
|
|
# Block login for marked accounts
|
|
if user.marked_for_deletion:
|
|
return jsonify({'error': 'This account has been marked for deletion and cannot be accessed.', 'code': ACCOUNT_MARKED_FOR_DELETION}), 403
|
|
|
|
# Purge expired refresh tokens for this user
|
|
_purge_expired_tokens(user.id)
|
|
|
|
# Create access token (short-lived JWT)
|
|
access_token = _create_access_token(user)
|
|
|
|
# Create refresh token (long-lived, new family for fresh login)
|
|
raw_refresh, _ = _create_refresh_token(user.id)
|
|
|
|
resp = jsonify({'message': 'Login successful'})
|
|
_set_auth_cookies(resp, access_token, raw_refresh)
|
|
return resp, 200
|
|
|
|
@auth_api.route('/me', methods=['GET'])
|
|
def me():
|
|
token = request.cookies.get('access_token')
|
|
if not token:
|
|
return jsonify({'error': 'Missing token', 'code': MISSING_TOKEN}), 401
|
|
|
|
try:
|
|
payload = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=['HS256'])
|
|
user_id = payload.get('user_id', '')
|
|
token_version = payload.get('token_version', 0)
|
|
user_dict = users_db.get(UserQuery.id == user_id)
|
|
user = User.from_dict(user_dict) if user_dict else None
|
|
if not user:
|
|
return jsonify({'error': 'User not found', 'code': USER_NOT_FOUND}), 404
|
|
if token_version != user.token_version:
|
|
return jsonify({'error': 'Invalid token', 'code': INVALID_TOKEN}), 401
|
|
if user.marked_for_deletion:
|
|
return jsonify({'error': 'Account marked for deletion', 'code': ACCOUNT_MARKED_FOR_DELETION}), 403
|
|
return jsonify({
|
|
'email': user.email,
|
|
'id': user_id,
|
|
'first_name': user.first_name,
|
|
'last_name': user.last_name,
|
|
'verified': user.verified
|
|
}), 200
|
|
except jwt.ExpiredSignatureError:
|
|
return jsonify({'error': 'Token expired', 'code': TOKEN_EXPIRED}), 401
|
|
except jwt.InvalidTokenError:
|
|
return jsonify({'error': 'Invalid token', 'code': INVALID_TOKEN}), 401
|
|
|
|
@auth_api.route('/request-password-reset', methods=['POST'])
|
|
def request_password_reset():
|
|
data = request.get_json()
|
|
email = data.get('email', '')
|
|
norm_email = normalize_email(email)
|
|
|
|
success_msg = 'If this email is registered, you will receive a password reset link shortly.'
|
|
|
|
if not email:
|
|
return jsonify({'error': 'Missing email', 'code': MISSING_EMAIL}), 400
|
|
|
|
user_dict = users_db.get(UserQuery.email == norm_email)
|
|
user = User.from_dict(user_dict) if user_dict else None
|
|
if user:
|
|
if user.marked_for_deletion:
|
|
return jsonify({'error': 'Account marked for deletion', 'code': ACCOUNT_MARKED_FOR_DELETION}), 403
|
|
token = secrets.token_urlsafe(32)
|
|
now_iso = datetime.utcnow().isoformat()
|
|
user.reset_token = token
|
|
user.reset_token_created = now_iso
|
|
users_db.update(user.to_dict(), UserQuery.email == norm_email)
|
|
send_reset_password_email(norm_email, token)
|
|
|
|
return jsonify({'message': success_msg}), 200
|
|
|
|
@auth_api.route('/validate-reset-token', methods=['GET'])
|
|
def validate_reset_token():
|
|
token = request.args.get('token')
|
|
if not token:
|
|
return jsonify({'error': 'Missing token', 'code': MISSING_TOKEN}), 400
|
|
|
|
user_dict = users_db.get(UserQuery.reset_token == token)
|
|
user = User.from_dict(user_dict) if user_dict else None
|
|
if not user:
|
|
return jsonify({'error': 'Invalid token', 'code': INVALID_TOKEN}), 400
|
|
|
|
created_str = user.reset_token_created
|
|
if not created_str:
|
|
return jsonify({'error': 'Token timestamp missing', 'code': TOKEN_TIMESTAMP_MISSING}), 400
|
|
|
|
created_dt = datetime.fromisoformat(created_str).replace(tzinfo=timezone.utc)
|
|
if datetime.now(timezone.utc) - created_dt > timedelta(minutes=RESET_PASSWORD_TOKEN_EXPIRY_MINUTES):
|
|
return jsonify({'error': 'Token expired', 'code': TOKEN_EXPIRED}), 400
|
|
|
|
return jsonify({'message': 'Token is valid'}), 200
|
|
|
|
@auth_api.route('/reset-password', methods=['POST'])
|
|
def reset_password():
|
|
data = request.get_json()
|
|
token = data.get('token')
|
|
new_password = data.get('password')
|
|
|
|
if not token or not new_password:
|
|
return jsonify({'error': 'Missing token or password'}), 400
|
|
|
|
user_dict = users_db.get(UserQuery.reset_token == token)
|
|
user = User.from_dict(user_dict) if user_dict else None
|
|
if not user:
|
|
return jsonify({'error': 'Invalid token', 'code': INVALID_TOKEN}), 400
|
|
|
|
created_str = user.reset_token_created
|
|
if not created_str:
|
|
return jsonify({'error': 'Token timestamp missing', 'code': TOKEN_TIMESTAMP_MISSING}), 400
|
|
|
|
created_dt = datetime.fromisoformat(created_str).replace(tzinfo=timezone.utc)
|
|
if datetime.now(timezone.utc) - created_dt > timedelta(minutes=RESET_PASSWORD_TOKEN_EXPIRY_MINUTES):
|
|
return jsonify({'error': 'Token expired', 'code': TOKEN_EXPIRED}), 400
|
|
|
|
user.password = generate_password_hash(new_password)
|
|
user.reset_token = None
|
|
user.reset_token_created = None
|
|
user.token_version += 1
|
|
users_db.update(user.to_dict(), UserQuery.email == user.email)
|
|
|
|
# Invalidate ALL refresh tokens for this user
|
|
refresh_tokens_db.remove(TokenQuery.user_id == user.id)
|
|
|
|
# Notify all active sessions (other tabs/devices) to sign out immediately
|
|
send_event_to_user(user.id, Event(EventType.FORCE_LOGOUT.value, Payload({'reason': 'password_reset'})))
|
|
|
|
resp = jsonify({'message': 'Password has been reset'})
|
|
_clear_auth_cookies(resp)
|
|
return resp, 200
|
|
|
|
|
|
@auth_api.route('/refresh', methods=['POST'])
|
|
def refresh():
|
|
raw_token = request.cookies.get('refresh_token')
|
|
if not raw_token:
|
|
return jsonify({'error': 'Missing refresh token', 'code': MISSING_REFRESH_TOKEN}), 401
|
|
|
|
token_hash = _hash_token(raw_token)
|
|
token_dict = refresh_tokens_db.get(TokenQuery.token_hash == token_hash)
|
|
|
|
if not token_dict:
|
|
# Token not found — could be invalid or already purged
|
|
resp = jsonify({'error': 'Invalid token', 'code': INVALID_TOKEN})
|
|
_clear_auth_cookies(resp)
|
|
return resp, 401
|
|
|
|
token_record = RefreshToken.from_dict(token_dict)
|
|
|
|
# THEFT DETECTION: token was already used (rotated out) but replayed
|
|
if token_record.is_used:
|
|
logger.warning(
|
|
'Refresh token reuse detected! user_id=%s, family=%s, ip=%s — killing all sessions',
|
|
token_record.user_id, token_record.token_family, request.remote_addr,
|
|
)
|
|
# Nuke ALL refresh tokens for this user
|
|
refresh_tokens_db.remove(TokenQuery.user_id == token_record.user_id)
|
|
resp = jsonify({'error': 'Token reuse detected, all sessions invalidated', 'code': REFRESH_TOKEN_REUSE})
|
|
_clear_auth_cookies(resp)
|
|
return resp, 401
|
|
|
|
# Check expiry
|
|
try:
|
|
exp = datetime.fromisoformat(token_record.expires_at)
|
|
if exp.tzinfo is None:
|
|
exp = exp.replace(tzinfo=timezone.utc)
|
|
if datetime.now(timezone.utc) > exp:
|
|
refresh_tokens_db.remove(TokenQuery.id == token_record.id)
|
|
resp = jsonify({'error': 'Refresh token expired', 'code': REFRESH_TOKEN_EXPIRED})
|
|
_clear_auth_cookies(resp)
|
|
return resp, 401
|
|
except ValueError:
|
|
refresh_tokens_db.remove(TokenQuery.id == token_record.id)
|
|
resp = jsonify({'error': 'Invalid token', 'code': INVALID_TOKEN})
|
|
_clear_auth_cookies(resp)
|
|
return resp, 401
|
|
|
|
# Look up the user
|
|
user_dict = users_db.get(UserQuery.id == token_record.user_id)
|
|
user = User.from_dict(user_dict) if user_dict else None
|
|
if not user:
|
|
refresh_tokens_db.remove(TokenQuery.id == token_record.id)
|
|
resp = jsonify({'error': 'User not found', 'code': USER_NOT_FOUND})
|
|
_clear_auth_cookies(resp)
|
|
return resp, 401
|
|
|
|
if user.marked_for_deletion:
|
|
refresh_tokens_db.remove(TokenQuery.user_id == user.id)
|
|
resp = jsonify({'error': 'Account marked for deletion', 'code': ACCOUNT_MARKED_FOR_DELETION})
|
|
_clear_auth_cookies(resp)
|
|
return resp, 403
|
|
|
|
# ROTATION: mark old token as used, create new one in same family
|
|
refresh_tokens_db.update({'is_used': True}, TokenQuery.id == token_record.id)
|
|
raw_new_refresh, _ = _create_refresh_token(user.id, token_family=token_record.token_family)
|
|
|
|
# Issue new access token
|
|
access_token = _create_access_token(user)
|
|
|
|
resp = jsonify({
|
|
'email': user.email,
|
|
'id': user.id,
|
|
'first_name': user.first_name,
|
|
'last_name': user.last_name,
|
|
'verified': user.verified,
|
|
})
|
|
_set_auth_cookies(resp, access_token, raw_new_refresh)
|
|
return resp, 200
|
|
|
|
|
|
@auth_api.route('/logout', methods=['POST'])
|
|
def logout():
|
|
# Delete the refresh token from DB if present
|
|
raw_token = request.cookies.get('refresh_token')
|
|
if raw_token:
|
|
token_hash = _hash_token(raw_token)
|
|
refresh_tokens_db.remove(TokenQuery.token_hash == token_hash)
|
|
|
|
resp = jsonify({'message': 'Logged out'})
|
|
_clear_auth_cookies(resp)
|
|
return resp, 200
|