266 lines
10 KiB
Python
266 lines
10 KiB
Python
import logging
|
|
import secrets, jwt
|
|
from datetime import datetime, timedelta, timezone
|
|
from models.user import User
|
|
from flask import Blueprint, request, jsonify, current_app
|
|
from flask_mail import Mail, Message
|
|
from tinydb import Query
|
|
import os
|
|
|
|
from api.utils import sanitize_email
|
|
from config.paths import get_user_image_dir
|
|
|
|
from api.error_codes import MISSING_FIELDS, EMAIL_EXISTS, MISSING_TOKEN, INVALID_TOKEN, TOKEN_TIMESTAMP_MISSING, \
|
|
TOKEN_EXPIRED, ALREADY_VERIFIED, MISSING_EMAIL, USER_NOT_FOUND, MISSING_EMAIL_OR_PASSWORD, INVALID_CREDENTIALS, \
|
|
NOT_VERIFIED
|
|
from db.db import users_db
|
|
|
|
logger = logging.getLogger(__name__)
|
|
auth_api = Blueprint('auth_api', __name__)
|
|
UserQuery = Query()
|
|
mail = Mail()
|
|
TOKEN_EXPIRY_MINUTES = 60*4
|
|
RESET_PASSWORD_TOKEN_EXPIRY_MINUTES = 10
|
|
|
|
|
|
def send_verification_email(to_email, token):
|
|
verify_url = f"{current_app.config['FRONTEND_URL']}/auth/verify?token={token}"
|
|
html_body = f'Click <a href="{verify_url}">here</a> to verify your account.'
|
|
msg = Message(
|
|
subject="Verify your account",
|
|
recipients=[to_email],
|
|
html=html_body,
|
|
sender=current_app.config['MAIL_DEFAULT_SENDER']
|
|
)
|
|
mail.send(msg)
|
|
|
|
def send_reset_password_email(to_email, token):
|
|
reset_url = f"{current_app.config['FRONTEND_URL']}/auth/reset-password?token={token}"
|
|
html_body = f'Click <a href="{reset_url}">here</a> to reset your password.'
|
|
msg = Message(
|
|
subject="Reset your password",
|
|
recipients=[to_email],
|
|
html=html_body,
|
|
sender=current_app.config['MAIL_DEFAULT_SENDER']
|
|
)
|
|
mail.send(msg)
|
|
|
|
@auth_api.route('/signup', methods=['POST'])
|
|
def signup():
|
|
data = request.get_json()
|
|
required_fields = ['first_name', 'last_name', 'email', 'password']
|
|
if not all(field in data for field in required_fields):
|
|
return jsonify({'error': 'Missing required fields', 'code': MISSING_FIELDS}), 400
|
|
|
|
if users_db.search(UserQuery.email == data['email']):
|
|
return jsonify({'error': 'Email already exists', 'code': EMAIL_EXISTS}), 400
|
|
|
|
token = secrets.token_urlsafe(32)
|
|
now_iso = datetime.utcnow().isoformat()
|
|
user = User(
|
|
first_name=data['first_name'],
|
|
last_name=data['last_name'],
|
|
email=data['email'],
|
|
password=data['password'], # Hash in production!
|
|
verified=False,
|
|
verify_token=token,
|
|
verify_token_created=now_iso,
|
|
image_id="boy01"
|
|
)
|
|
users_db.insert(user.to_dict())
|
|
send_verification_email(data['email'], token)
|
|
return jsonify({'message': 'User created, verification email sent'}), 201
|
|
|
|
@auth_api.route('/verify', methods=['GET'])
|
|
def verify():
|
|
token = request.args.get('token')
|
|
status = 'success'
|
|
reason = ''
|
|
code = ''
|
|
user_dict = None
|
|
user = None
|
|
|
|
if not token:
|
|
status = 'error'
|
|
reason = 'Missing token'
|
|
code = MISSING_TOKEN
|
|
else:
|
|
user_dict = users_db.get(Query().verify_token == token)
|
|
user = User.from_dict(user_dict) if user_dict else None
|
|
if not user:
|
|
status = 'error'
|
|
reason = 'Invalid token'
|
|
code = INVALID_TOKEN
|
|
else:
|
|
created_str = user.verify_token_created
|
|
if not created_str:
|
|
status = 'error'
|
|
reason = 'Token timestamp missing'
|
|
code = TOKEN_TIMESTAMP_MISSING
|
|
else:
|
|
created_dt = datetime.fromisoformat(created_str).replace(tzinfo=timezone.utc)
|
|
if datetime.now(timezone.utc) - created_dt > timedelta(minutes=TOKEN_EXPIRY_MINUTES):
|
|
status = 'error'
|
|
reason = 'Token expired'
|
|
code = TOKEN_EXPIRED
|
|
else:
|
|
user.verified = True
|
|
user.verify_token = None
|
|
user.verify_token_created = None
|
|
users_db.update(user.to_dict(), Query().email == user.email)
|
|
|
|
http_status = 200 if status == 'success' else 400
|
|
if http_status == 200 and user is not None:
|
|
if not user.email:
|
|
logger.error("Verified user has no email field.")
|
|
else:
|
|
user_image_dir = get_user_image_dir(sanitize_email(user.email))
|
|
os.makedirs(user_image_dir, exist_ok=True)
|
|
|
|
return jsonify({'status': status, 'reason': reason, 'code': code}), http_status
|
|
|
|
@auth_api.route('/resend-verify', methods=['POST'])
|
|
def resend_verify():
|
|
data = request.get_json()
|
|
email = data.get('email')
|
|
if not email:
|
|
return jsonify({'error': 'Missing email', 'code': MISSING_EMAIL}), 400
|
|
|
|
user_dict = users_db.get(UserQuery.email == email)
|
|
user = User.from_dict(user_dict) if user_dict else None
|
|
if not user:
|
|
return jsonify({'error': 'User not found', 'code': USER_NOT_FOUND}), 404
|
|
|
|
if user.verified:
|
|
return jsonify({'error': 'Account already verified', 'code': ALREADY_VERIFIED}), 400
|
|
|
|
token = secrets.token_urlsafe(32)
|
|
now_iso = datetime.utcnow().isoformat()
|
|
user.verify_token = token
|
|
user.verify_token_created = now_iso
|
|
users_db.update(user.to_dict(), UserQuery.email == email)
|
|
send_verification_email(email, token)
|
|
return jsonify({'message': 'Verification email resent'}), 200
|
|
|
|
@auth_api.route('/login', methods=['POST'])
|
|
def login():
|
|
data = request.get_json()
|
|
email = data.get('email')
|
|
password = data.get('password')
|
|
if not email or not password:
|
|
return jsonify({'error': 'Missing email or password', 'code': MISSING_EMAIL_OR_PASSWORD}), 400
|
|
|
|
user_dict = users_db.get(UserQuery.email == email)
|
|
user = User.from_dict(user_dict) if user_dict else None
|
|
if not user or user.password != password:
|
|
return jsonify({'error': 'Invalid credentials', 'code': INVALID_CREDENTIALS}), 401
|
|
|
|
if not user.verified:
|
|
return jsonify({'error': 'This account has not verified', 'code': NOT_VERIFIED}), 403
|
|
|
|
payload = {
|
|
'email': email,
|
|
'exp': datetime.utcnow() + timedelta(hours=24*7)
|
|
}
|
|
token = jwt.encode(payload, current_app.config['SECRET_KEY'], algorithm='HS256')
|
|
|
|
resp = jsonify({'message': 'Login successful'})
|
|
resp.set_cookie('token', token, httponly=True, secure=True, samesite='Strict')
|
|
return resp, 200
|
|
|
|
@auth_api.route('/me', methods=['GET'])
|
|
def me():
|
|
token = request.cookies.get('token')
|
|
if not token:
|
|
return jsonify({'error': 'Missing token', 'code': MISSING_TOKEN}), 401
|
|
|
|
try:
|
|
payload = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=['HS256'])
|
|
email = payload.get('email')
|
|
user_dict = users_db.get(UserQuery.email == email)
|
|
user = User.from_dict(user_dict) if user_dict else None
|
|
if not user:
|
|
return jsonify({'error': 'User not found', 'code': USER_NOT_FOUND}), 404
|
|
return jsonify({
|
|
'email': user.email,
|
|
'id': sanitize_email(user.email),
|
|
'first_name': user.first_name,
|
|
'last_name': user.last_name,
|
|
'verified': user.verified
|
|
}), 200
|
|
except jwt.ExpiredSignatureError:
|
|
return jsonify({'error': 'Token expired', 'code': TOKEN_EXPIRED}), 401
|
|
except jwt.InvalidTokenError:
|
|
return jsonify({'error': 'Invalid token', 'code': INVALID_TOKEN}), 401
|
|
|
|
@auth_api.route('/request-password-reset', methods=['POST'])
|
|
def request_password_reset():
|
|
data = request.get_json()
|
|
email = data.get('email')
|
|
success_msg = 'If this email is registered, you will receive a password reset link shortly.'
|
|
|
|
if not email:
|
|
return jsonify({'error': 'Missing email', 'code': MISSING_EMAIL}), 400
|
|
|
|
user_dict = users_db.get(UserQuery.email == email)
|
|
user = User.from_dict(user_dict) if user_dict else None
|
|
if user:
|
|
token = secrets.token_urlsafe(32)
|
|
now_iso = datetime.utcnow().isoformat()
|
|
user.reset_token = token
|
|
user.reset_token_created = now_iso
|
|
users_db.update(user.to_dict(), UserQuery.email == email)
|
|
send_reset_password_email(email, token)
|
|
|
|
return jsonify({'message': success_msg}), 200
|
|
|
|
@auth_api.route('/validate-reset-token', methods=['GET'])
|
|
def validate_reset_token():
|
|
token = request.args.get('token')
|
|
if not token:
|
|
return jsonify({'error': 'Missing token', 'code': MISSING_TOKEN}), 400
|
|
|
|
user_dict = users_db.get(UserQuery.reset_token == token)
|
|
user = User.from_dict(user_dict) if user_dict else None
|
|
if not user:
|
|
return jsonify({'error': 'Invalid token', 'code': INVALID_TOKEN}), 400
|
|
|
|
created_str = user.reset_token_created
|
|
if not created_str:
|
|
return jsonify({'error': 'Token timestamp missing', 'code': TOKEN_TIMESTAMP_MISSING}), 400
|
|
|
|
created_dt = datetime.fromisoformat(created_str).replace(tzinfo=timezone.utc)
|
|
if datetime.now(timezone.utc) - created_dt > timedelta(minutes=RESET_PASSWORD_TOKEN_EXPIRY_MINUTES):
|
|
return jsonify({'error': 'Token expired', 'code': TOKEN_EXPIRED}), 400
|
|
|
|
return jsonify({'message': 'Token is valid'}), 200
|
|
|
|
@auth_api.route('/reset-password', methods=['POST'])
|
|
def reset_password():
|
|
data = request.get_json()
|
|
token = data.get('token')
|
|
new_password = data.get('password')
|
|
|
|
if not token or not new_password:
|
|
return jsonify({'error': 'Missing token or password'}), 400
|
|
|
|
user_dict = users_db.get(UserQuery.reset_token == token)
|
|
user = User.from_dict(user_dict) if user_dict else None
|
|
if not user:
|
|
return jsonify({'error': 'Invalid token', 'code': INVALID_TOKEN}), 400
|
|
|
|
created_str = user.reset_token_created
|
|
if not created_str:
|
|
return jsonify({'error': 'Token timestamp missing', 'code': TOKEN_TIMESTAMP_MISSING}), 400
|
|
|
|
created_dt = datetime.fromisoformat(created_str).replace(tzinfo=timezone.utc)
|
|
if datetime.now(timezone.utc) - created_dt > timedelta(minutes=RESET_PASSWORD_TOKEN_EXPIRY_MINUTES):
|
|
return jsonify({'error': 'Token expired', 'code': TOKEN_EXPIRED}), 400
|
|
|
|
user.password = new_password # Hash in production!
|
|
user.reset_token = None
|
|
user.reset_token_created = None
|
|
users_db.update(user.to_dict(), UserQuery.email == user.email)
|
|
|
|
return jsonify({'message': 'Password has been reset'}), 200
|