All checks were successful
Chore App Build, Test, and Push Docker Images / build-and-push (push) Successful in 3m23s
- Introduced a dual-token system for user authentication: a short-lived access token and a long-lived rotating refresh token. - Created a new RefreshToken model to manage refresh tokens securely. - Updated auth_api.py to handle login, refresh, and logout processes with the new token system. - Enhanced security measures including token rotation and theft detection. - Updated frontend to handle token refresh on 401 errors and adjusted SSE authentication. - Removed CORS middleware as it's unnecessary behind the nginx proxy. - Added tests to ensure functionality and security of the new token system.
230 lines
8.3 KiB
Python
230 lines
8.3 KiB
Python
import pytest
|
|
from datetime import datetime, timezone
|
|
from flask import Flask
|
|
from api.user_api import user_api
|
|
from api.auth_api import auth_api
|
|
from db.db import users_db
|
|
from tinydb import Query
|
|
import jwt
|
|
from werkzeug.security import generate_password_hash
|
|
from tests.conftest import TEST_SECRET_KEY, TEST_REFRESH_TOKEN_EXPIRY_DAYS
|
|
|
|
# Test user credentials
|
|
TEST_EMAIL = "usertest@example.com"
|
|
TEST_PASSWORD = "testpass123"
|
|
MARKED_EMAIL = "marked@example.com"
|
|
MARKED_PASSWORD = "markedpass"
|
|
|
|
def add_test_users():
|
|
"""Add test users to the database."""
|
|
# Remove if exists
|
|
users_db.remove(Query().email == TEST_EMAIL)
|
|
users_db.remove(Query().email == MARKED_EMAIL)
|
|
|
|
# Add regular test user
|
|
users_db.insert({
|
|
"id": "test_user_id",
|
|
"first_name": "Test",
|
|
"last_name": "User",
|
|
"email": TEST_EMAIL,
|
|
"password": generate_password_hash(TEST_PASSWORD),
|
|
"verified": True,
|
|
"image_id": "boy01",
|
|
"marked_for_deletion": False,
|
|
"marked_for_deletion_at": None
|
|
})
|
|
|
|
# Add user already marked for deletion
|
|
users_db.insert({
|
|
"id": "marked_user_id",
|
|
"first_name": "Marked",
|
|
"last_name": "User",
|
|
"email": MARKED_EMAIL,
|
|
"password": generate_password_hash(MARKED_PASSWORD),
|
|
"verified": True,
|
|
"image_id": "girl01",
|
|
"marked_for_deletion": True,
|
|
"marked_for_deletion_at": "2024-01-15T10:30:00+00:00"
|
|
})
|
|
|
|
def login_and_get_token(client, email, password):
|
|
"""Login and extract JWT token from response."""
|
|
resp = client.post('/auth/login', json={"email": email, "password": password})
|
|
assert resp.status_code == 200
|
|
# Verify auth cookies are set
|
|
cookies = resp.headers.getlist('Set-Cookie')
|
|
cookie_str = ' '.join(cookies)
|
|
assert 'access_token=' in cookie_str
|
|
# Flask test client automatically handles cookies
|
|
return resp
|
|
|
|
@pytest.fixture
|
|
def client():
|
|
"""Setup Flask test client with registered blueprints."""
|
|
app = Flask(__name__)
|
|
app.register_blueprint(user_api)
|
|
app.register_blueprint(auth_api, url_prefix='/auth')
|
|
app.config['TESTING'] = True
|
|
app.config['SECRET_KEY'] = TEST_SECRET_KEY
|
|
app.config['REFRESH_TOKEN_EXPIRY_DAYS'] = TEST_REFRESH_TOKEN_EXPIRY_DAYS
|
|
app.config['FRONTEND_URL'] = 'http://localhost:5173' # Needed for email_sender
|
|
with app.test_client() as client:
|
|
add_test_users()
|
|
yield client
|
|
|
|
@pytest.fixture
|
|
def authenticated_client(client):
|
|
"""Setup client with authenticated user session."""
|
|
login_and_get_token(client, TEST_EMAIL, TEST_PASSWORD)
|
|
return client
|
|
|
|
@pytest.fixture
|
|
def marked_client(client):
|
|
"""Setup client with marked-for-deletion user session."""
|
|
login_and_get_token(client, MARKED_EMAIL, MARKED_PASSWORD)
|
|
return client
|
|
|
|
def test_mark_user_for_deletion_success(authenticated_client):
|
|
"""Test successfully marking a user account for deletion."""
|
|
response = authenticated_client.post('/user/mark-for-deletion', json={"email": TEST_EMAIL})
|
|
assert response.status_code == 200
|
|
data = response.get_json()
|
|
assert data['success'] is True
|
|
|
|
# Verify database was updated
|
|
UserQuery = Query()
|
|
user = users_db.search(UserQuery.email == TEST_EMAIL)[0]
|
|
assert user['marked_for_deletion'] is True
|
|
assert user['marked_for_deletion_at'] is not None
|
|
|
|
# Verify timestamp is valid ISO format
|
|
marked_at = datetime.fromisoformat(user['marked_for_deletion_at'])
|
|
assert marked_at.tzinfo is not None
|
|
|
|
def test_login_for_marked_user_returns_403(client):
|
|
"""Test that login for a marked-for-deletion user returns 403 Forbidden."""
|
|
response = client.post('/auth/login', json={
|
|
"email": MARKED_EMAIL,
|
|
"password": MARKED_PASSWORD
|
|
})
|
|
assert response.status_code == 403
|
|
data = response.get_json()
|
|
assert 'error' in data
|
|
assert data['code'] == 'ACCOUNT_MARKED_FOR_DELETION'
|
|
|
|
def test_mark_for_deletion_requires_auth(client):
|
|
"""Test that marking for deletion requires authentication."""
|
|
response = client.post('/user/mark-for-deletion', json={"email": TEST_EMAIL})
|
|
assert response.status_code == 401
|
|
data = response.get_json()
|
|
assert 'error' in data
|
|
|
|
def test_login_blocked_for_marked_user(client):
|
|
"""Test that login is blocked for users marked for deletion."""
|
|
response = client.post('/auth/login', json={
|
|
"email": MARKED_EMAIL,
|
|
"password": MARKED_PASSWORD
|
|
})
|
|
assert response.status_code == 403
|
|
data = response.get_json()
|
|
assert 'error' in data
|
|
assert data['code'] == 'ACCOUNT_MARKED_FOR_DELETION'
|
|
|
|
def test_login_succeeds_for_unmarked_user(client):
|
|
"""Test that login works normally for users not marked for deletion."""
|
|
response = client.post('/auth/login', json={
|
|
"email": TEST_EMAIL,
|
|
"password": TEST_PASSWORD
|
|
})
|
|
assert response.status_code == 200
|
|
data = response.get_json()
|
|
assert 'message' in data
|
|
|
|
def test_password_reset_ignored_for_marked_user(client):
|
|
"""Test that password reset requests return 403 for marked users."""
|
|
response = client.post('/auth/request-password-reset', json={"email": MARKED_EMAIL})
|
|
assert response.status_code == 403
|
|
data = response.get_json()
|
|
assert 'error' in data
|
|
assert data['code'] == 'ACCOUNT_MARKED_FOR_DELETION'
|
|
|
|
def test_password_reset_works_for_unmarked_user(client):
|
|
"""Test that password reset works normally for unmarked users."""
|
|
response = client.post('/auth/request-password-reset', json={"email": TEST_EMAIL})
|
|
assert response.status_code == 200
|
|
data = response.get_json()
|
|
assert 'message' in data
|
|
|
|
def test_mark_for_deletion_updates_timestamp(authenticated_client):
|
|
"""Test that marking for deletion sets a proper timestamp."""
|
|
before_time = datetime.now(timezone.utc)
|
|
|
|
response = authenticated_client.post('/user/mark-for-deletion', json={"email": TEST_EMAIL})
|
|
assert response.status_code == 200
|
|
|
|
after_time = datetime.now(timezone.utc)
|
|
|
|
# Verify timestamp is between before and after
|
|
UserQuery = Query()
|
|
user = users_db.search(UserQuery.email == TEST_EMAIL)[0]
|
|
marked_at = datetime.fromisoformat(user['marked_for_deletion_at'])
|
|
|
|
assert before_time <= marked_at <= after_time
|
|
|
|
|
|
def test_mark_for_deletion_clears_tokens(authenticated_client):
|
|
"""When an account is marked for deletion, verify/reset tokens must be cleared."""
|
|
# Seed verify/reset tokens for the user
|
|
UserQuery = Query()
|
|
now_iso = datetime.utcnow().isoformat()
|
|
users_db.update({
|
|
'verify_token': 'verify-abc',
|
|
'verify_token_created': now_iso,
|
|
'reset_token': 'reset-xyz',
|
|
'reset_token_created': now_iso
|
|
}, UserQuery.email == TEST_EMAIL)
|
|
|
|
# Ensure tokens are present before marking
|
|
user_before = users_db.search(UserQuery.email == TEST_EMAIL)[0]
|
|
assert user_before['verify_token'] is not None
|
|
assert user_before['reset_token'] is not None
|
|
|
|
# Mark account for deletion
|
|
response = authenticated_client.post('/user/mark-for-deletion', json={"email": TEST_EMAIL})
|
|
assert response.status_code == 200
|
|
|
|
# Verify tokens were cleared in the DB
|
|
user_after = users_db.search(UserQuery.email == TEST_EMAIL)[0]
|
|
assert user_after.get('verify_token') is None
|
|
assert user_after.get('verify_token_created') is None
|
|
assert user_after.get('reset_token') is None
|
|
assert user_after.get('reset_token_created') is None
|
|
|
|
def test_mark_for_deletion_with_invalid_jwt(client):
|
|
"""Test marking for deletion with invalid JWT token."""
|
|
# Set invalid cookie manually
|
|
client.set_cookie('access_token', 'invalid.jwt.token')
|
|
|
|
response = client.post('/user/mark-for-deletion', json={})
|
|
assert response.status_code == 401
|
|
data = response.get_json()
|
|
assert 'error' in data
|
|
|
|
def test_update_profile_success(authenticated_client):
|
|
"""Test successfully updating user profile."""
|
|
response = authenticated_client.put('/user/profile', json={
|
|
'first_name': 'Updated',
|
|
'last_name': 'Name',
|
|
'image_id': 'new_image'
|
|
})
|
|
assert response.status_code == 200
|
|
data = response.get_json()
|
|
assert data['message'] == 'Profile updated'
|
|
|
|
# Verify database was updated
|
|
UserQuery = Query()
|
|
user = users_db.search(UserQuery.email == TEST_EMAIL)[0]
|
|
assert user['first_name'] == 'Updated'
|
|
assert user['last_name'] == 'Name'
|
|
assert user['image_id'] == 'new_image'
|