All checks were successful
Chore App Build, Test, and Push Docker Images / build-and-push (push) Successful in 3m23s
- Introduced a dual-token system for user authentication: a short-lived access token and a long-lived rotating refresh token. - Created a new RefreshToken model to manage refresh tokens securely. - Updated auth_api.py to handle login, refresh, and logout processes with the new token system. - Enhanced security measures including token rotation and theft detection. - Updated frontend to handle token refresh on 401 errors and adjusted SSE authentication. - Removed CORS middleware as it's unnecessary behind the nginx proxy. - Added tests to ensure functionality and security of the new token system.
191 lines
6.9 KiB
Python
191 lines
6.9 KiB
Python
import pytest
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
from flask import Flask
|
|
from api.auth_api import auth_api
|
|
from db.db import users_db, refresh_tokens_db
|
|
from tinydb import Query
|
|
from models.user import User
|
|
from datetime import datetime
|
|
from tests.conftest import TEST_SECRET_KEY, TEST_REFRESH_TOKEN_EXPIRY_DAYS
|
|
|
|
@pytest.fixture
|
|
def client():
|
|
"""Setup Flask test client with auth blueprint."""
|
|
app = Flask(__name__)
|
|
app.register_blueprint(auth_api, url_prefix='/auth')
|
|
app.config['TESTING'] = True
|
|
app.config['SECRET_KEY'] = TEST_SECRET_KEY
|
|
app.config['REFRESH_TOKEN_EXPIRY_DAYS'] = TEST_REFRESH_TOKEN_EXPIRY_DAYS
|
|
app.config['FRONTEND_URL'] = 'http://localhost:5173'
|
|
with app.test_client() as client:
|
|
yield client
|
|
|
|
def test_signup_hashes_password(client):
|
|
"""Test that signup hashes the password."""
|
|
# Clean up any existing user
|
|
users_db.remove(Query().email == 'test@example.com')
|
|
|
|
data = {
|
|
'first_name': 'Test',
|
|
'last_name': 'User',
|
|
'email': 'test@example.com',
|
|
'password': 'password123'
|
|
}
|
|
response = client.post('/auth/signup', json=data)
|
|
assert response.status_code == 201
|
|
|
|
# Check that password is hashed in DB
|
|
user_dict = users_db.get(Query().email == 'test@example.com')
|
|
assert user_dict is not None
|
|
assert user_dict['password'].startswith('scrypt:')
|
|
|
|
def test_login_with_correct_password(client):
|
|
"""Test login succeeds with correct password."""
|
|
# Clean up and create a user with hashed password
|
|
users_db.remove(Query().email == 'test@example.com')
|
|
hashed_pw = generate_password_hash('password123')
|
|
user = User(
|
|
first_name='Test',
|
|
last_name='User',
|
|
email='test@example.com',
|
|
password=hashed_pw,
|
|
verified=True
|
|
)
|
|
users_db.insert(user.to_dict())
|
|
|
|
data = {'email': 'test@example.com', 'password': 'password123'}
|
|
response = client.post('/auth/login', json=data)
|
|
assert response.status_code == 200
|
|
cookies = response.headers.getlist('Set-Cookie')
|
|
cookie_str = ' '.join(cookies)
|
|
assert 'access_token=' in cookie_str
|
|
assert 'refresh_token=' in cookie_str
|
|
|
|
def test_login_with_incorrect_password(client):
|
|
"""Test login fails with incorrect password."""
|
|
# Clean up and create a user with hashed password
|
|
users_db.remove(Query().email == 'test@example.com')
|
|
hashed_pw = generate_password_hash('password123')
|
|
user = User(
|
|
first_name='Test',
|
|
last_name='User',
|
|
email='test@example.com',
|
|
password=hashed_pw,
|
|
verified=True
|
|
)
|
|
users_db.insert(user.to_dict())
|
|
|
|
data = {'email': 'test@example.com', 'password': 'wrongpassword'}
|
|
response = client.post('/auth/login', json=data)
|
|
assert response.status_code == 401
|
|
assert response.json['code'] == 'INVALID_CREDENTIALS'
|
|
|
|
def test_reset_password_hashes_new_password(client):
|
|
"""Test that reset-password hashes the new password."""
|
|
# Clean up and create a user with reset token
|
|
users_db.remove(Query().email == 'test@example.com')
|
|
user = User(
|
|
first_name='Test',
|
|
last_name='User',
|
|
email='test@example.com',
|
|
password=generate_password_hash('oldpassword'),
|
|
verified=True,
|
|
reset_token='validtoken',
|
|
reset_token_created=datetime.utcnow().isoformat()
|
|
)
|
|
users_db.insert(user.to_dict())
|
|
|
|
data = {'token': 'validtoken', 'password': 'newpassword123'}
|
|
response = client.post('/auth/reset-password', json=data)
|
|
assert response.status_code == 200
|
|
|
|
# Check that password is hashed in DB
|
|
user_dict = users_db.get(Query().email == 'test@example.com')
|
|
assert user_dict is not None
|
|
assert user_dict['password'].startswith('scrypt:')
|
|
assert check_password_hash(user_dict['password'], 'newpassword123')
|
|
|
|
|
|
def test_reset_password_invalidates_existing_jwt(client):
|
|
users_db.remove(Query().email == 'test@example.com')
|
|
user = User(
|
|
first_name='Test',
|
|
last_name='User',
|
|
email='test@example.com',
|
|
password=generate_password_hash('oldpassword123'),
|
|
verified=True,
|
|
reset_token='validtoken2',
|
|
reset_token_created=datetime.utcnow().isoformat(),
|
|
)
|
|
users_db.insert(user.to_dict())
|
|
|
|
login_response = client.post('/auth/login', json={'email': 'test@example.com', 'password': 'oldpassword123'})
|
|
assert login_response.status_code == 200
|
|
login_cookies = login_response.headers.getlist('Set-Cookie')
|
|
login_cookie_str = ' '.join(login_cookies)
|
|
assert 'access_token=' in login_cookie_str
|
|
# Extract the old access token
|
|
old_token = None
|
|
for c in login_cookies:
|
|
if c.startswith('access_token='):
|
|
old_token = c.split('access_token=', 1)[1].split(';', 1)[0]
|
|
break
|
|
assert old_token
|
|
|
|
reset_response = client.post('/auth/reset-password', json={'token': 'validtoken2', 'password': 'newpassword123'})
|
|
assert reset_response.status_code == 200
|
|
reset_cookies = reset_response.headers.getlist('Set-Cookie')
|
|
reset_cookie_str = ' '.join(reset_cookies)
|
|
assert 'access_token=' in reset_cookie_str
|
|
|
|
# Verify all refresh tokens for this user are deleted
|
|
user_dict = users_db.get(Query().email == 'test@example.com')
|
|
user_tokens = refresh_tokens_db.search(Query().user_id == user_dict['id'])
|
|
assert len(user_tokens) == 0
|
|
|
|
# Set the old token as a cookie and test that it's now invalid
|
|
client.set_cookie('access_token', old_token)
|
|
me_response = client.get('/auth/me')
|
|
assert me_response.status_code == 401
|
|
assert me_response.json['code'] == 'INVALID_TOKEN'
|
|
|
|
def test_migration_script_hashes_plain_text_passwords():
|
|
"""Test the migration script hashes plain text passwords."""
|
|
# Clean up
|
|
users_db.remove(Query().email == 'test1@example.com')
|
|
users_db.remove(Query().email == 'test2@example.com')
|
|
|
|
# Create users with plain text passwords
|
|
user1 = User(
|
|
first_name='Test1',
|
|
last_name='User',
|
|
email='test1@example.com',
|
|
password='plaintext1',
|
|
verified=True
|
|
)
|
|
already_hashed = generate_password_hash('alreadyhashed')
|
|
user2 = User(
|
|
first_name='Test2',
|
|
last_name='User',
|
|
email='test2@example.com',
|
|
password=already_hashed, # Already hashed
|
|
verified=True
|
|
)
|
|
users_db.insert(user1.to_dict())
|
|
users_db.insert(user2.to_dict())
|
|
|
|
# Run migration script
|
|
import sys
|
|
import os
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
|
from scripts.hash_passwords import main
|
|
main()
|
|
|
|
# Check user1 password is now hashed
|
|
user1_dict = users_db.get(Query().email == 'test1@example.com')
|
|
assert user1_dict['password'].startswith('scrypt:')
|
|
assert check_password_hash(user1_dict['password'], 'plaintext1')
|
|
|
|
# Check user2 password unchanged
|
|
user2_dict = users_db.get(Query().email == 'test2@example.com')
|
|
assert user2_dict['password'] == already_hashed |