Files
chore/backend/tests/test_admin_api.py
Ryan Kegel ebaef16daf
All checks were successful
Chore App Build, Test, and Push Docker Images / build-and-push (push) Successful in 3m23s
feat: implement long-term user login with refresh tokens
- Introduced a dual-token system for user authentication: a short-lived access token and a long-lived rotating refresh token.
- Created a new RefreshToken model to manage refresh tokens securely.
- Updated auth_api.py to handle login, refresh, and logout processes with the new token system.
- Enhanced security measures including token rotation and theft detection.
- Updated frontend to handle token refresh on 401 errors and adjusted SSE authentication.
- Removed CORS middleware as it's unnecessary behind the nginx proxy.
- Added tests to ensure functionality and security of the new token system.
2026-03-01 19:27:25 -05:00

450 lines
15 KiB
Python

import os
import sys
import pytest
import jwt
from datetime import datetime, timedelta
from unittest.mock import patch
# Set up path and environment before imports
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
os.environ['DB_ENV'] = 'test'
from main import app
from models.user import User
from db.db import users_db
from config.deletion_config import MIN_THRESHOLD_HOURS, MAX_THRESHOLD_HOURS
from tinydb import Query
from tests.conftest import TEST_SECRET_KEY
@pytest.fixture
def client():
"""Create test client."""
app.config['TESTING'] = True
with app.test_client() as client:
yield client
@pytest.fixture
def admin_user():
"""Create admin user and return auth token."""
users_db.truncate()
user = User(
id='admin_user',
email='admin@example.com',
first_name='Test',
last_name='User',
password='hash',
marked_for_deletion=False,
marked_for_deletion_at=None,
deletion_in_progress=False,
deletion_attempted_at=None,
role='admin'
)
users_db.insert(user.to_dict())
# Create JWT token
token = jwt.encode({'user_id': 'admin_user'}, TEST_SECRET_KEY, algorithm='HS256')
return token
@pytest.fixture
def setup_deletion_queue():
"""Set up test users in deletion queue."""
users_db.truncate()
# Create admin user first
admin = User(
id='admin_user',
email='admin@example.com',
first_name='Admin',
last_name='User',
password='hash',
marked_for_deletion=False,
marked_for_deletion_at=None,
deletion_in_progress=False,
deletion_attempted_at=None,
role='admin'
)
users_db.insert(admin.to_dict())
# User due for deletion
user1 = User(
id='user1',
email='user1@example.com',
first_name='Test',
last_name='User',
password='hash',
marked_for_deletion=True,
marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(),
deletion_in_progress=False,
deletion_attempted_at=None
)
users_db.insert(user1.to_dict())
# User not yet due
user2 = User(
id='user2',
email='user2@example.com',
first_name='Test',
last_name='User',
password='hash',
marked_for_deletion=True,
marked_for_deletion_at=(datetime.now() - timedelta(hours=100)).isoformat(),
deletion_in_progress=False,
deletion_attempted_at=None
)
users_db.insert(user2.to_dict())
# User with deletion in progress
user3 = User(
id='user3',
email='user3@example.com',
first_name='Test',
last_name='User',
password='hash',
marked_for_deletion=True,
marked_for_deletion_at=(datetime.now() - timedelta(hours=850)).isoformat(),
deletion_in_progress=True,
deletion_attempted_at=datetime.now().isoformat()
)
users_db.insert(user3.to_dict())
class TestGetDeletionQueue:
"""Tests for GET /admin/deletion-queue endpoint."""
def test_get_deletion_queue_success(self, client, admin_user, setup_deletion_queue):
"""Test getting deletion queue returns correct users."""
client.set_cookie('access_token', admin_user)
response = client.get('/admin/deletion-queue')
assert response.status_code == 200
data = response.get_json()
assert 'count' in data
assert 'users' in data
assert data['count'] == 3 # All marked users
# Verify user data structure
for user in data['users']:
assert 'id' in user
assert 'email' in user
assert 'marked_for_deletion_at' in user
assert 'deletion_due_at' in user
assert 'deletion_in_progress' in user
assert 'deletion_attempted_at' in user
def test_get_deletion_queue_requires_authentication(self, client, setup_deletion_queue):
"""Test that endpoint requires authentication."""
response = client.get('/admin/deletion-queue')
assert response.status_code == 401
data = response.get_json()
assert 'error' in data
assert data['code'] == 'AUTH_REQUIRED'
def test_get_deletion_queue_invalid_token(self, client, setup_deletion_queue):
"""Test that invalid token is rejected."""
client.set_cookie('access_token', 'invalid_token')
response = client.get('/admin/deletion-queue')
assert response.status_code == 401
data = response.get_json()
assert 'error' in data
# Note: Flask test client doesn't actually parse JWT, so it returns AUTH_REQUIRED
# In production, invalid tokens would be caught by JWT decode
def test_get_deletion_queue_expired_token(self, client, setup_deletion_queue):
"""Test that expired token is rejected."""
# Create expired token
expired_token = jwt.encode(
{'user_id': 'admin_user', 'exp': datetime.now() - timedelta(hours=1)},
TEST_SECRET_KEY,
algorithm='HS256'
)
client.set_cookie('access_token', expired_token)
response = client.get('/admin/deletion-queue')
assert response.status_code == 401
data = response.get_json()
assert 'error' in data
assert data['code'] == 'TOKEN_EXPIRED'
def test_get_deletion_queue_empty(self, client, admin_user):
"""Test getting deletion queue when empty."""
users_db.truncate()
# Re-create admin user
admin = User(
id='admin_user',
email='admin@example.com',
first_name='Test',
last_name='User',
password='hash',
marked_for_deletion=False,
marked_for_deletion_at=None,
deletion_in_progress=False,
deletion_attempted_at=None,
role='admin'
)
users_db.insert(admin.to_dict())
client.set_cookie('access_token', admin_user)
response = client.get('/admin/deletion-queue')
assert response.status_code == 200
data = response.get_json()
assert data['count'] == 0
assert len(data['users']) == 0
class TestGetDeletionThreshold:
"""Tests for GET /admin/deletion-threshold endpoint."""
def test_get_threshold_success(self, client, admin_user):
"""Test getting current threshold configuration."""
client.set_cookie('access_token', admin_user)
response = client.get('/admin/deletion-threshold')
assert response.status_code == 200
data = response.get_json()
assert 'threshold_hours' in data
assert 'threshold_min' in data
assert 'threshold_max' in data
assert data['threshold_min'] == MIN_THRESHOLD_HOURS
assert data['threshold_max'] == MAX_THRESHOLD_HOURS
def test_get_threshold_requires_authentication(self, client):
"""Test that endpoint requires authentication."""
response = client.get('/admin/deletion-threshold')
assert response.status_code == 401
data = response.get_json()
assert data['code'] == 'AUTH_REQUIRED'
class TestUpdateDeletionThreshold:
"""Tests for PUT /admin/deletion-threshold endpoint."""
def test_update_threshold_success(self, client, admin_user):
"""Test updating threshold with valid value."""
client.set_cookie('access_token', admin_user)
response = client.put(
'/admin/deletion-threshold',
json={'threshold_hours': 168}
)
assert response.status_code == 200
data = response.get_json()
assert 'message' in data
assert data['threshold_hours'] == 168
def test_update_threshold_validates_minimum(self, client, admin_user):
"""Test that threshold below minimum is rejected."""
client.set_cookie('access_token', admin_user)
response = client.put(
'/admin/deletion-threshold',
json={'threshold_hours': 23}
)
assert response.status_code == 400
data = response.get_json()
assert 'error' in data
assert data['code'] == 'THRESHOLD_TOO_LOW'
def test_update_threshold_validates_maximum(self, client, admin_user):
"""Test that threshold above maximum is rejected."""
client.set_cookie('access_token', admin_user)
response = client.put(
'/admin/deletion-threshold',
json={'threshold_hours': 721}
)
assert response.status_code == 400
data = response.get_json()
assert 'error' in data
assert data['code'] == 'THRESHOLD_TOO_HIGH'
def test_update_threshold_missing_value(self, client, admin_user):
"""Test that missing threshold value is rejected."""
client.set_cookie('access_token', admin_user)
response = client.put(
'/admin/deletion-threshold',
json={}
)
assert response.status_code == 400
data = response.get_json()
assert 'error' in data
assert data['code'] == 'MISSING_THRESHOLD'
def test_update_threshold_invalid_type(self, client, admin_user):
"""Test that non-integer threshold is rejected."""
client.set_cookie('access_token', admin_user)
response = client.put(
'/admin/deletion-threshold',
json={'threshold_hours': 'invalid'}
)
assert response.status_code == 400
data = response.get_json()
assert 'error' in data
assert data['code'] == 'INVALID_TYPE'
def test_update_threshold_requires_authentication(self, client):
"""Test that endpoint requires authentication."""
response = client.put(
'/admin/deletion-threshold',
json={'threshold_hours': 168}
)
assert response.status_code == 401
class TestTriggerDeletionQueue:
"""Tests for POST /admin/deletion-queue/trigger endpoint."""
def test_trigger_deletion_success(self, client, admin_user, setup_deletion_queue):
"""Test manually triggering deletion queue."""
client.set_cookie('access_token', admin_user)
response = client.post('/admin/deletion-queue/trigger')
assert response.status_code == 200
data = response.get_json()
assert 'message' in data
assert 'processed' in data
assert 'deleted' in data
assert 'failed' in data
def test_trigger_deletion_requires_authentication(self, client):
"""Test that endpoint requires authentication."""
response = client.post('/admin/deletion-queue/trigger')
assert response.status_code == 401
data = response.get_json()
assert data['code'] == 'AUTH_REQUIRED'
def test_trigger_deletion_with_empty_queue(self, client, admin_user):
"""Test triggering deletion with empty queue."""
users_db.truncate()
# Re-create admin user
admin = User(
id='admin_user',
email='admin@example.com',
first_name='Test',
last_name='User',
password='hash',
marked_for_deletion=False,
marked_for_deletion_at=None,
deletion_in_progress=False,
deletion_attempted_at=None,
role='admin'
)
users_db.insert(admin.to_dict())
client.set_cookie('access_token', admin_user)
response = client.post('/admin/deletion-queue/trigger')
assert response.status_code == 200
data = response.get_json()
assert data['processed'] == 0
class TestAdminRoleValidation:
"""Tests for admin role validation."""
def test_non_admin_user_access(self, client):
"""
Test that non-admin users cannot access admin endpoints.
"""
users_db.truncate()
# Create non-admin user (role='user')
user = User(
id='regular_user',
email='user@example.com',
first_name='Test',
last_name='User',
password='hash',
marked_for_deletion=False,
marked_for_deletion_at=None,
deletion_in_progress=False,
deletion_attempted_at=None,
role='user'
)
users_db.insert(user.to_dict())
# Create token for non-admin
token = jwt.encode({'user_id': 'regular_user'}, TEST_SECRET_KEY, algorithm='HS256')
client.set_cookie('access_token', token)
response = client.get('/admin/deletion-queue')
# Should return 403 Forbidden
assert response.status_code == 403
data = response.get_json()
assert data['code'] == 'ADMIN_REQUIRED'
assert 'Admin access required' in data['error']
def test_admin_user_access(self, client):
"""
Test that admin users can access admin endpoints.
"""
users_db.truncate()
# Create admin user (role='admin')
admin = User(
id='admin_user',
email='admin@example.com',
first_name='Admin',
last_name='User',
password='hash',
marked_for_deletion=False,
marked_for_deletion_at=None,
deletion_in_progress=False,
deletion_attempted_at=None,
role='admin'
)
users_db.insert(admin.to_dict())
# Create token for admin
token = jwt.encode({'user_id': 'admin_user'}, TEST_SECRET_KEY, algorithm='HS256')
client.set_cookie('access_token', token)
response = client.get('/admin/deletion-queue')
# Should succeed
assert response.status_code == 200
def test_update_threshold_requires_admin(self, client):
"""
Test that updating deletion threshold requires admin role.
"""
users_db.truncate()
# Create non-admin user
user = User(
id='regular_user',
email='user@example.com',
first_name='Test',
last_name='User',
password='hash',
role='user'
)
users_db.insert(user.to_dict())
token = jwt.encode({'user_id': 'regular_user'}, TEST_SECRET_KEY, algorithm='HS256')
client.set_cookie('access_token', token)
response = client.put('/admin/deletion-threshold', json={'threshold_hours': 168})
assert response.status_code == 403
data = response.get_json()
assert data['code'] == 'ADMIN_REQUIRED'