Implement account deletion handling and improve user feedback
Some checks failed
Chore App Build and Push Docker Images / build-and-push (push) Has been cancelled

- Added checks for accounts marked for deletion in signup, verification, and password reset processes.
- Updated reward and task listing to sort user-created items first.
- Enhanced user API to clear verification and reset tokens when marking accounts for deletion.
- Introduced tests for marked accounts to ensure proper handling in various scenarios.
- Updated profile and reward edit components to reflect changes in validation and data handling.
This commit is contained in:
2026-02-17 10:38:26 -05:00
parent 3e1715e487
commit 7e7a2ef49e
15 changed files with 724 additions and 35 deletions

View File

@@ -0,0 +1,82 @@
import pytest
from flask import Flask
from api.auth_api import auth_api
from db.db import users_db
from tinydb import Query
from models.user import User
from werkzeug.security import generate_password_hash
from datetime import datetime, timedelta
import jwt
@pytest.fixture
def client():
app = Flask(__name__)
app.register_blueprint(auth_api, url_prefix='/auth')
app.config['TESTING'] = True
app.config['SECRET_KEY'] = 'supersecretkey'
with app.test_client() as client:
yield client
def setup_marked_user(email, verified=False, verify_token=None, reset_token=None):
users_db.remove(Query().email == email)
user = User(
first_name='Marked',
last_name='User',
email=email,
password=generate_password_hash('password123'),
verified=verified,
marked_for_deletion=True,
verify_token=verify_token,
verify_token_created=datetime.utcnow().isoformat() if verify_token else None,
reset_token=reset_token,
reset_token_created=datetime.utcnow().isoformat() if reset_token else None
)
users_db.insert(user.to_dict())
def test_signup_marked_for_deletion(client):
setup_marked_user('marked@example.com')
data = {
'first_name': 'Marked',
'last_name': 'User',
'email': 'marked@example.com',
'password': 'password123'
}
response = client.post('/auth/signup', json=data)
assert response.status_code == 403
assert response.json['code'] == 'ACCOUNT_MARKED_FOR_DELETION'
def test_verify_marked_for_deletion(client):
setup_marked_user('marked2@example.com', verify_token='verifytoken123')
response = client.get('/auth/verify', query_string={'token': 'verifytoken123'})
assert response.status_code == 400
assert response.json['code'] == 'ACCOUNT_MARKED_FOR_DELETION'
def test_request_password_reset_marked_for_deletion(client):
setup_marked_user('marked3@example.com')
response = client.post('/auth/request-password-reset', json={'email': 'marked3@example.com'})
assert response.status_code == 403
assert response.json['code'] == 'ACCOUNT_MARKED_FOR_DELETION'
def test_me_marked_for_deletion(client):
email = 'marked4@example.com'
setup_marked_user(email, verified=True)
# Get the user to access the ID
user_dict = users_db.get(Query().email == email)
user = User.from_dict(user_dict)
# Create a valid JWT token for the marked user
payload = {
'email': email,
'user_id': user.id,
'exp': datetime.utcnow() + timedelta(hours=24)
}
token = jwt.encode(payload, 'supersecretkey', algorithm='HS256')
# Make request with token cookie
client.set_cookie('token', token)
response = client.get('/auth/me')
assert response.status_code == 403
assert response.json['code'] == 'ACCOUNT_MARKED_FOR_DELETION'

View File

@@ -15,6 +15,7 @@ from utils.account_deletion_scheduler import (
delete_user_data,
process_deletion_queue,
check_interrupted_deletions,
trigger_deletion_manually,
MAX_DELETION_ATTEMPTS
)
from models.user import User
@@ -953,3 +954,163 @@ class TestIntegration:
assert users_db.get(Query_.id == user_id) is None
assert child_db.get(Query_.id == child_id) is None
assert not os.path.exists(user_image_dir)
class TestManualDeletionTrigger:
"""Tests for manually triggered deletion (admin endpoint)."""
def setup_method(self):
"""Clear test databases before each test."""
users_db.truncate()
child_db.truncate()
task_db.truncate()
reward_db.truncate()
image_db.truncate()
pending_reward_db.truncate()
def teardown_method(self):
"""Clean up test directories after each test."""
for user_id in ['manual_user_1', 'manual_user_2', 'manual_user_3', 'manual_user_retry', 'recent_user']:
user_dir = get_user_image_dir(user_id)
if os.path.exists(user_dir):
try:
shutil.rmtree(user_dir)
except:
pass
def test_manual_trigger_deletes_immediately(self):
"""Test that manual trigger deletes users marked recently (not past threshold)."""
user_id = 'manual_user_1'
# Create user marked only 1 hour ago (well before 720 hour threshold)
marked_time = (datetime.now() - timedelta(hours=1)).isoformat()
user = User(
id=user_id,
email='manual1@example.com',
first_name='Manual',
last_name='Test',
password='hash',
marked_for_deletion=True,
marked_for_deletion_at=marked_time,
deletion_in_progress=False,
deletion_attempted_at=None
)
users_db.insert(user.to_dict())
# Verify user is NOT due for deletion under normal circumstances
assert is_user_due_for_deletion(user) is False
# Manually trigger deletion
result = trigger_deletion_manually()
# Verify user was deleted despite not being past threshold
Query_ = Query()
assert users_db.get(Query_.id == user_id) is None
assert result['triggered'] is True
def test_manual_trigger_deletes_multiple_users(self):
"""Test that manual trigger deletes all marked users regardless of time."""
# Create multiple users marked at different times
users_data = [
('manual_user_1', 1), # 1 hour ago
('manual_user_2', 100), # 100 hours ago
('manual_user_3', 800), # 800 hours ago (past threshold)
]
for user_id, hours_ago in users_data:
marked_time = (datetime.now() - timedelta(hours=hours_ago)).isoformat()
user = User(
id=user_id,
email=f'{user_id}@example.com',
first_name='Manual',
last_name='Test',
password='hash',
marked_for_deletion=True,
marked_for_deletion_at=marked_time,
deletion_in_progress=False,
deletion_attempted_at=None
)
users_db.insert(user.to_dict())
# Verify only one is due under normal circumstances
all_users = users_db.all()
due_count = sum(1 for u in all_users if is_user_due_for_deletion(User.from_dict(u)))
assert due_count == 1 # Only the 800 hour old one
# Manually trigger deletion
trigger_deletion_manually()
# Verify ALL marked users were deleted
Query_ = Query()
assert len(users_db.all()) == 0
def test_manual_trigger_respects_retry_limit(self):
"""Test that manual trigger still respects max retry limit."""
user_id = 'manual_user_retry'
# Create user marked recently with max attempts already
marked_time = (datetime.now() - timedelta(hours=1)).isoformat()
attempted_time = (datetime.now() - timedelta(hours=1)).isoformat()
user = User(
id=user_id,
email='retry@example.com',
first_name='Retry',
last_name='Test',
password='hash',
marked_for_deletion=True,
marked_for_deletion_at=marked_time,
deletion_in_progress=False,
deletion_attempted_at=attempted_time # Has 1 attempt
)
users_db.insert(user.to_dict())
# Mock delete_user_data to fail consistently
with patch('utils.account_deletion_scheduler.delete_user_data', return_value=False):
# Trigger multiple times to exceed retry limit
for _ in range(MAX_DELETION_ATTEMPTS):
trigger_deletion_manually()
# User should still exist after max attempts
Query_ = Query()
remaining_user = users_db.get(Query_.id == user_id)
assert remaining_user is not None
def test_manual_trigger_with_no_marked_users(self):
"""Test that manual trigger handles empty queue gracefully."""
result = trigger_deletion_manually()
assert result['triggered'] is True
assert result['queued_users'] == 0
def test_normal_scheduler_still_respects_threshold(self):
"""Test that normal scheduler run (force=False) still respects time threshold."""
user_id = 'recent_user'
# Create user marked only 1 hour ago
marked_time = (datetime.now() - timedelta(hours=1)).isoformat()
user = User(
id=user_id,
email='recent@example.com',
first_name='Recent',
last_name='Test',
password='hash',
marked_for_deletion=True,
marked_for_deletion_at=marked_time,
deletion_in_progress=False,
deletion_attempted_at=None
)
users_db.insert(user.to_dict())
# Run normal scheduler (not manual trigger)
process_deletion_queue(force=False)
# User should still exist because not past threshold
Query_ = Query()
assert users_db.get(Query_.id == user_id) is not None
# Now run with force=True
process_deletion_queue(force=True)
# User should be deleted
assert users_db.get(Query_.id == user_id) is None

View File

@@ -138,11 +138,12 @@ def test_login_succeeds_for_unmarked_user(client):
assert 'message' in data
def test_password_reset_ignored_for_marked_user(client):
"""Test that password reset requests are silently ignored for marked users."""
"""Test that password reset requests return 403 for marked users."""
response = client.post('/request-password-reset', json={"email": MARKED_EMAIL})
assert response.status_code == 200
assert response.status_code == 403
data = response.get_json()
assert 'message' in data
assert 'error' in data
assert data['code'] == 'ACCOUNT_MARKED_FOR_DELETION'
def test_password_reset_works_for_unmarked_user(client):
"""Test that password reset works normally for unmarked users."""
@@ -167,6 +168,35 @@ def test_mark_for_deletion_updates_timestamp(authenticated_client):
assert before_time <= marked_at <= after_time
def test_mark_for_deletion_clears_tokens(authenticated_client):
"""When an account is marked for deletion, verify/reset tokens must be cleared."""
# Seed verify/reset tokens for the user
UserQuery = Query()
now_iso = datetime.utcnow().isoformat()
users_db.update({
'verify_token': 'verify-abc',
'verify_token_created': now_iso,
'reset_token': 'reset-xyz',
'reset_token_created': now_iso
}, UserQuery.email == TEST_EMAIL)
# Ensure tokens are present before marking
user_before = users_db.search(UserQuery.email == TEST_EMAIL)[0]
assert user_before['verify_token'] is not None
assert user_before['reset_token'] is not None
# Mark account for deletion
response = authenticated_client.post('/user/mark-for-deletion', json={"email": TEST_EMAIL})
assert response.status_code == 200
# Verify tokens were cleared in the DB
user_after = users_db.search(UserQuery.email == TEST_EMAIL)[0]
assert user_after.get('verify_token') is None
assert user_after.get('verify_token_created') is None
assert user_after.get('reset_token') is None
assert user_after.get('reset_token_created') is None
def test_mark_for_deletion_with_invalid_jwt(client):
"""Test marking for deletion with invalid JWT token."""
# Set invalid cookie manually