import logging import secrets, jwt from datetime import datetime, timedelta, timezone from flask import Blueprint, request, jsonify, current_app from flask_mail import Mail, Message from tinydb import Query import os from api.utils import sanitize_email from config.paths import get_user_image_dir from api.error_codes import MISSING_FIELDS, EMAIL_EXISTS, MISSING_TOKEN, INVALID_TOKEN, TOKEN_TIMESTAMP_MISSING, \ TOKEN_EXPIRED, ALREADY_VERIFIED, MISSING_EMAIL, USER_NOT_FOUND, MISSING_EMAIL_OR_PASSWORD, INVALID_CREDENTIALS, \ NOT_VERIFIED from db.db import users_db logger = logging.getLogger(__name__) auth_api = Blueprint('auth_api', __name__) UserQuery = Query() mail = Mail() TOKEN_EXPIRY_MINUTES = 60*4 RESET_PASSWORD_TOKEN_EXPIRY_MINUTES = 10 def send_verification_email(to_email, token): verify_url = f"{current_app.config['FRONTEND_URL']}/auth/verify?token={token}" msg = Message( subject="Verify your account", recipients=[to_email], body=f"Click to verify your account: {verify_url}", sender=current_app.config['MAIL_DEFAULT_SENDER'] ) mail.send(msg) def send_reset_password_email(to_email, token): reset_url = f"{current_app.config['FRONTEND_URL']}/auth/reset-password?token={token}" msg = Message( subject="Reset your password", recipients=[to_email], body=f"Click to reset your password: {reset_url}", sender=current_app.config['MAIL_DEFAULT_SENDER'] ) mail.send(msg) @auth_api.route('/signup', methods=['POST']) def signup(): data = request.get_json() required_fields = ['first_name', 'last_name', 'email', 'password'] if not all(field in data for field in required_fields): return jsonify({'error': 'Missing required fields', 'code': MISSING_FIELDS}), 400 if users_db.search(UserQuery.email == data['email']): return jsonify({'error': 'Email already exists', 'code': EMAIL_EXISTS}), 400 token = secrets.token_urlsafe(32) now_iso = datetime.utcnow().isoformat() users_db.insert({ 'first_name': data['first_name'], 'last_name': data['last_name'], 'email': data['email'], 'password': data['password'], # Hash in production! 'verified': False, 'verify_token': token, 'verify_token_created': now_iso }) send_verification_email(data['email'], token) return jsonify({'message': 'User created, verification email sent'}), 201 @auth_api.route('/verify', methods=['GET']) def verify(): token = request.args.get('token') status = 'success' reason = '' code = '' user = None if not token: status = 'error' reason = 'Missing token' code = MISSING_TOKEN else: user = users_db.get(Query().verify_token == token) if not user: status = 'error' reason = 'Invalid token' code = INVALID_TOKEN else: created_str = user.get('verify_token_created') if not created_str: status = 'error' reason = 'Token timestamp missing' code = TOKEN_TIMESTAMP_MISSING else: created_dt = datetime.fromisoformat(created_str).replace(tzinfo=timezone.utc) if datetime.now(timezone.utc) - created_dt > timedelta(minutes=TOKEN_EXPIRY_MINUTES): status = 'error' reason = 'Token expired' code = TOKEN_EXPIRED else: users_db.update({'verified': True, 'verify_token': None, 'verify_token_created': None}, Query().verify_token == token) http_status = 200 if status == 'success' else 400 if http_status == 200 and user is not None: ##user is verified, create the user's image directory if 'email' not in user: logger.error("Verified user has no email field.") else: user_image_dir = get_user_image_dir(sanitize_email(user['email'])) os.makedirs(user_image_dir, exist_ok=True) return jsonify({'status': status, 'reason': reason, 'code': code}), http_status @auth_api.route('/resend-verify', methods=['POST']) def resend_verify(): data = request.get_json() email = data.get('email') if not email: return jsonify({'error': 'Missing email', 'code': MISSING_EMAIL}), 400 user = users_db.get(UserQuery.email == email) if not user: return jsonify({'error': 'User not found', 'code': USER_NOT_FOUND}), 404 if user.get('verified'): return jsonify({'error': 'Account already verified', 'code': ALREADY_VERIFIED}), 400 token = secrets.token_urlsafe(32) now_iso = datetime.utcnow().isoformat() users_db.update({ 'verify_token': token, 'verify_token_created': now_iso }, UserQuery.email == email) send_verification_email(email, token) return jsonify({'message': 'Verification email resent'}), 200 @auth_api.route('/login', methods=['POST']) def login(): data = request.get_json() email = data.get('email') password = data.get('password') if not email or not password: return jsonify({'error': 'Missing email or password', 'code': MISSING_EMAIL_OR_PASSWORD}), 400 user = users_db.get(UserQuery.email == email) if not user or user.get('password') != password: return jsonify({'error': 'Invalid credentials', 'code': INVALID_CREDENTIALS}), 401 if not user.get('verified'): return jsonify({'error': 'This account has not verified', 'code': NOT_VERIFIED}), 403 payload = { 'email': email, 'exp': datetime.utcnow() + timedelta(hours=24*7) } token = jwt.encode(payload, current_app.config['SECRET_KEY'], algorithm='HS256') resp = jsonify({'message': 'Login successful'}) resp.set_cookie('token', token, httponly=True, secure=True, samesite='Strict') return resp, 200 @auth_api.route('/me', methods=['GET']) def me(): token = request.cookies.get('token') if not token: return jsonify({'error': 'Missing token', 'code': MISSING_TOKEN}), 401 try: payload = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=['HS256']) email = payload.get('email') user = users_db.get(UserQuery.email == email) if not user: return jsonify({'error': 'User not found', 'code': USER_NOT_FOUND}), 404 return jsonify({ 'email': user['email'], 'id': sanitize_email(user['email']), 'first_name': user['first_name'], 'last_name': user['last_name'], 'verified': user['verified'] }), 200 except jwt.ExpiredSignatureError: return jsonify({'error': 'Token expired', 'code': TOKEN_EXPIRED}), 401 except jwt.InvalidTokenError: return jsonify({'error': 'Invalid token', 'code': INVALID_TOKEN}), 401 @auth_api.route('/logout', methods=['POST']) def logout(): resp = jsonify({'message': 'Logged out'}) resp.set_cookie('token', '', expires=0, httponly=True, secure=True, samesite='Strict') return resp, 200 @auth_api.route('/request-password-reset', methods=['POST']) def request_password_reset(): data = request.get_json() email = data.get('email') # Always return success for privacy success_msg = 'If this email is registered, you will receive a password reset link shortly.' if not email: return jsonify({'error': 'Missing email', 'code': MISSING_EMAIL}), 400 user = users_db.get(UserQuery.email == email) if user: token = secrets.token_urlsafe(32) now_iso = datetime.utcnow().isoformat() users_db.update({ 'reset_token': token, 'reset_token_created': now_iso }, UserQuery.email == email) send_reset_password_email(email, token) return jsonify({'message': success_msg}), 200 @auth_api.route('/validate-reset-token', methods=['GET']) def validate_reset_token(): token = request.args.get('token') if not token: return jsonify({'error': 'Missing token', 'code': MISSING_TOKEN}), 400 user = users_db.get(UserQuery.reset_token == token) if not user: return jsonify({'error': 'Invalid token', 'code': INVALID_TOKEN}), 400 created_str = user.get('reset_token_created') if not created_str: return jsonify({'error': 'Token timestamp missing', 'code': TOKEN_TIMESTAMP_MISSING}), 400 created_dt = datetime.fromisoformat(created_str).replace(tzinfo=timezone.utc) if datetime.now(timezone.utc) - created_dt > timedelta(minutes=RESET_PASSWORD_TOKEN_EXPIRY_MINUTES): return jsonify({'error': 'Token expired', 'code': TOKEN_EXPIRED}), 400 return jsonify({'message': 'Token is valid'}), 200 @auth_api.route('/reset-password', methods=['POST']) def reset_password(): data = request.get_json() token = data.get('token') new_password = data.get('password') if not token or not new_password: return jsonify({'error': 'Missing token or password'}), 400 user = users_db.get(UserQuery.reset_token == token) if not user: return jsonify({'error': 'Invalid token', 'code': INVALID_TOKEN}), 400 created_str = user.get('reset_token_created') if not created_str: return jsonify({'error': 'Token timestamp missing', 'code': TOKEN_TIMESTAMP_MISSING}), 400 created_dt = datetime.fromisoformat(created_str).replace(tzinfo=timezone.utc) if datetime.now(timezone.utc) - created_dt > timedelta(minutes=RESET_PASSWORD_TOKEN_EXPIRY_MINUTES): return jsonify({'error': 'Token expired', 'code': TOKEN_EXPIRED}), 400 users_db.update({ 'password': new_password, # Hash in production! 'reset_token': None, 'reset_token_created': None }, UserQuery.reset_token == token) return jsonify({'message': 'Password has been reset'}), 200