All checks were successful
Chore App Build, Test, and Push Docker Images / build-and-push (push) Successful in 3m23s
- Introduced a dual-token system for user authentication: a short-lived access token and a long-lived rotating refresh token. - Created a new RefreshToken model to manage refresh tokens securely. - Updated auth_api.py to handle login, refresh, and logout processes with the new token system. - Enhanced security measures including token rotation and theft detection. - Updated frontend to handle token refresh on 401 errors and adjusted SSE authentication. - Removed CORS middleware as it's unnecessary behind the nginx proxy. - Added tests to ensure functionality and security of the new token system.
149 lines
5.7 KiB
Python
149 lines
5.7 KiB
Python
import pytest
|
|
from tests.conftest import TEST_SECRET_KEY, TEST_REFRESH_TOKEN_EXPIRY_DAYS
|
|
import os
|
|
from werkzeug.security import generate_password_hash
|
|
|
|
from flask import Flask
|
|
from api.task_api import task_api
|
|
from api.auth_api import auth_api
|
|
from db.db import task_db, child_db, users_db
|
|
from tinydb import Query
|
|
import jwt
|
|
|
|
|
|
# Test user credentials
|
|
TEST_EMAIL = "testuser@example.com"
|
|
TEST_PASSWORD = "testpass"
|
|
|
|
def add_test_user():
|
|
users_db.remove(Query().email == TEST_EMAIL)
|
|
users_db.insert({
|
|
"id": "testuserid",
|
|
"first_name": "Test",
|
|
"last_name": "User",
|
|
"email": TEST_EMAIL,
|
|
"password": generate_password_hash(TEST_PASSWORD),
|
|
"verified": True,
|
|
"image_id": "boy01"
|
|
})
|
|
|
|
def login_and_set_cookie(client):
|
|
resp = client.post('/auth/login', json={"email": TEST_EMAIL, "password": TEST_PASSWORD})
|
|
assert resp.status_code == 200
|
|
cookies = resp.headers.getlist("Set-Cookie")
|
|
cookie_str = ' '.join(cookies)
|
|
assert cookie_str and "access_token=" in cookie_str
|
|
|
|
@pytest.fixture
|
|
def client():
|
|
app = Flask(__name__)
|
|
app.register_blueprint(task_api)
|
|
app.register_blueprint(auth_api, url_prefix='/auth')
|
|
app.config['TESTING'] = True
|
|
app.config['SECRET_KEY'] = TEST_SECRET_KEY
|
|
app.config['REFRESH_TOKEN_EXPIRY_DAYS'] = TEST_REFRESH_TOKEN_EXPIRY_DAYS
|
|
with app.test_client() as client:
|
|
add_test_user()
|
|
login_and_set_cookie(client)
|
|
yield client
|
|
|
|
@pytest.fixture(scope="session", autouse=True)
|
|
def cleanup_db():
|
|
yield
|
|
task_db.close()
|
|
if os.path.exists('tasks.json'):
|
|
os.remove('tasks.json')
|
|
|
|
def test_add_task(client):
|
|
response = client.put('/task/add', json={'name': 'Clean Room', 'points': 10, 'type': 'chore'})
|
|
assert response.status_code == 201
|
|
assert b'Task Clean Room added.' in response.data
|
|
# verify in database
|
|
tasks = task_db.all()
|
|
assert any(task.get('name') == 'Clean Room' and task.get('points') == 10 and task.get('type') == 'chore' and task.get('image_id') == '' for task in tasks)
|
|
|
|
response = client.put('/task/add', json={'name': 'Eat Dinner', 'points': 5, 'type': 'penalty', 'image_id': 'meal'})
|
|
assert response.status_code == 201
|
|
assert b'Task Eat Dinner added.' in response.data
|
|
# verify in database
|
|
tasks = task_db.all()
|
|
assert any(task.get('name') == 'Eat Dinner' and task.get('points') == 5 and task.get('type') == 'penalty' and task.get('image_id') == 'meal' for task in tasks)
|
|
|
|
|
|
|
|
def test_list_tasks(client):
|
|
task_db.truncate()
|
|
# Insert user-owned tasks
|
|
task_db.insert({'id': 't1', 'name': 'Task1', 'points': 5, 'type': 'chore', 'user_id': 'testuserid'})
|
|
task_db.insert({'id': 't2', 'name': 'Task2', 'points': 15, 'type': 'penalty', 'image_id': 'meal', 'user_id': 'testuserid'})
|
|
response = client.get('/task/list')
|
|
assert response.status_code == 200
|
|
assert b'tasks' in response.data
|
|
data = response.json
|
|
assert len(data['tasks']) == 2
|
|
|
|
|
|
def test_list_tasks_sorted_by_is_good_then_user_then_default_then_name(client):
|
|
task_db.truncate()
|
|
|
|
task_db.insert({'id': 'u_good_z', 'name': 'Zoo', 'points': 1, 'type': 'chore', 'user_id': 'testuserid'})
|
|
task_db.insert({'id': 'u_good_a', 'name': 'Apple', 'points': 1, 'type': 'chore', 'user_id': 'testuserid'})
|
|
task_db.insert({'id': 'd_good_m', 'name': 'Mop', 'points': 1, 'type': 'chore', 'user_id': None})
|
|
task_db.insert({'id': 'd_good_b', 'name': 'Brush', 'points': 1, 'type': 'chore', 'user_id': None})
|
|
|
|
task_db.insert({'id': 'u_bad_c', 'name': 'Chore', 'points': 1, 'type': 'penalty', 'user_id': 'testuserid'})
|
|
task_db.insert({'id': 'u_bad_a', 'name': 'Alarm', 'points': 1, 'type': 'penalty', 'user_id': 'testuserid'})
|
|
task_db.insert({'id': 'd_bad_y', 'name': 'Yell', 'points': 1, 'type': 'penalty', 'user_id': None})
|
|
task_db.insert({'id': 'd_bad_b', 'name': 'Bicker', 'points': 1, 'type': 'penalty', 'user_id': None})
|
|
|
|
response = client.get('/task/list')
|
|
assert response.status_code == 200
|
|
|
|
tasks = response.json['tasks']
|
|
ordered_ids = [t['id'] for t in tasks]
|
|
assert ordered_ids == [
|
|
'u_good_a',
|
|
'u_good_z',
|
|
'd_good_b',
|
|
'd_good_m',
|
|
'u_bad_a',
|
|
'u_bad_c',
|
|
'd_bad_b',
|
|
'd_bad_y',
|
|
]
|
|
|
|
|
|
def test_get_task_not_found(client):
|
|
response = client.get('/task/nonexistent-id')
|
|
assert response.status_code == 404
|
|
assert b'Task not found' in response.data
|
|
|
|
def test_delete_task_not_found(client):
|
|
response = client.delete('/task/nonexistent-id')
|
|
assert response.status_code == 404
|
|
assert b'Task not found' in response.data
|
|
|
|
def test_delete_assigned_task_removes_from_child(client):
|
|
# create user-owned task and child with the task already assigned
|
|
task_db.insert({'id': 't_delete_assigned', 'name': 'Temp Task', 'points': 5, 'type': 'chore', 'user_id': 'testuserid'})
|
|
child_db.insert({
|
|
'id': 'child_for_task_delete',
|
|
'name': 'Frank',
|
|
'age': 7,
|
|
'points': 0,
|
|
'tasks': ['t_delete_assigned'],
|
|
'rewards': []
|
|
})
|
|
ChildQuery = Query()
|
|
# Ensure child has the user-owned task
|
|
child2 = child_db.search(ChildQuery.id == 'child_for_task_delete')[0]
|
|
if 't_delete_assigned' not in child2.get('tasks', []):
|
|
child2['tasks'] = ['t_delete_assigned']
|
|
child_db.update({'tasks': ['t_delete_assigned']}, ChildQuery.id == 'child_for_task_delete')
|
|
assert 't_delete_assigned' in child_db.search(ChildQuery.id == 'child_for_task_delete')[0].get('tasks', [])
|
|
# call the delete endpoint
|
|
resp = client.delete('/task/t_delete_assigned')
|
|
assert resp.status_code == 200
|
|
# verify the task id is no longer in the child's tasks
|
|
child = child_db.search(ChildQuery.id == 'child_for_task_delete')[0]
|
|
assert 't_delete_assigned' not in child.get('tasks', []) |