Some checks failed
Chore App Build and Push Docker Images / build-and-push (push) Has been cancelled
- Added checks for accounts marked for deletion in signup, verification, and password reset processes. - Updated reward and task listing to sort user-created items first. - Enhanced user API to clear verification and reset tokens when marking accounts for deletion. - Introduced tests for marked accounts to ensure proper handling in various scenarios. - Updated profile and reward edit components to reflect changes in validation and data handling.
1117 lines
39 KiB
Python
1117 lines
39 KiB
Python
import os
|
|
import sys
|
|
import pytest
|
|
import shutil
|
|
from datetime import datetime, timedelta
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
# Set up path and environment before imports
|
|
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
|
os.environ['DB_ENV'] = 'test'
|
|
|
|
from utils.account_deletion_scheduler import (
|
|
is_user_due_for_deletion,
|
|
get_deletion_attempt_count,
|
|
delete_user_data,
|
|
process_deletion_queue,
|
|
check_interrupted_deletions,
|
|
trigger_deletion_manually,
|
|
MAX_DELETION_ATTEMPTS
|
|
)
|
|
from models.user import User
|
|
from models.child import Child
|
|
from models.task import Task
|
|
from models.reward import Reward
|
|
from models.image import Image
|
|
from models.pending_reward import PendingReward
|
|
from db.db import users_db, child_db, task_db, reward_db, image_db, pending_reward_db
|
|
from config.paths import get_user_image_dir
|
|
from tinydb import Query
|
|
|
|
|
|
class TestSchedulerIdentification:
|
|
"""Tests for identifying users due for deletion."""
|
|
|
|
def setup_method(self):
|
|
"""Clear test databases before each test."""
|
|
users_db.truncate()
|
|
child_db.truncate()
|
|
task_db.truncate()
|
|
reward_db.truncate()
|
|
image_db.truncate()
|
|
pending_reward_db.truncate()
|
|
|
|
def test_user_due_for_deletion(self):
|
|
"""Test scheduler identifies users past the threshold."""
|
|
# Create user marked 800 hours ago (past 720 hour threshold)
|
|
marked_time = (datetime.now() - timedelta(hours=800)).isoformat()
|
|
user = User(
|
|
id='user1',
|
|
first_name='Test',
|
|
last_name='User',
|
|
email='test@example.com',
|
|
password='hash',
|
|
marked_for_deletion=True,
|
|
marked_for_deletion_at=marked_time,
|
|
deletion_in_progress=False,
|
|
deletion_attempted_at=None
|
|
)
|
|
|
|
assert is_user_due_for_deletion(user) is True
|
|
|
|
def test_user_not_due_for_deletion(self):
|
|
"""Test scheduler ignores users not yet due."""
|
|
# Create user marked 100 hours ago (before 720 hour threshold)
|
|
marked_time = (datetime.now() - timedelta(hours=100)).isoformat()
|
|
user = User(
|
|
id='user2',
|
|
email='test2@example.com',
|
|
first_name='Test',
|
|
last_name='User',
|
|
password='hash',
|
|
marked_for_deletion=True,
|
|
marked_for_deletion_at=marked_time,
|
|
deletion_in_progress=False,
|
|
deletion_attempted_at=None
|
|
)
|
|
|
|
assert is_user_due_for_deletion(user) is False
|
|
|
|
def test_user_not_marked_for_deletion(self):
|
|
"""Test scheduler ignores users not marked for deletion."""
|
|
user = User(
|
|
id='user3',
|
|
email='test3@example.com',
|
|
first_name='Test',
|
|
last_name='User',
|
|
password='hash',
|
|
marked_for_deletion=False,
|
|
marked_for_deletion_at=None,
|
|
deletion_in_progress=False,
|
|
deletion_attempted_at=None
|
|
)
|
|
|
|
assert is_user_due_for_deletion(user) is False
|
|
|
|
def test_user_with_invalid_timestamp(self):
|
|
"""Test scheduler handles invalid timestamp gracefully."""
|
|
user = User(
|
|
id='user4',
|
|
email='test4@example.com',
|
|
first_name='Test',
|
|
last_name='User',
|
|
password='hash',
|
|
marked_for_deletion=True,
|
|
marked_for_deletion_at='invalid-timestamp',
|
|
deletion_in_progress=False,
|
|
deletion_attempted_at=None
|
|
)
|
|
|
|
assert is_user_due_for_deletion(user) is False
|
|
|
|
def test_empty_database(self):
|
|
"""Test scheduler handles empty database gracefully."""
|
|
# Database is already empty from setup_method
|
|
# Should not raise any errors
|
|
process_deletion_queue()
|
|
|
|
# Verify no users were deleted
|
|
assert len(users_db.all()) == 0
|
|
|
|
|
|
class TestDeletionProcess:
|
|
"""Tests for the deletion process."""
|
|
|
|
def setup_method(self):
|
|
"""Clear test databases and create test data before each test."""
|
|
users_db.truncate()
|
|
child_db.truncate()
|
|
task_db.truncate()
|
|
reward_db.truncate()
|
|
image_db.truncate()
|
|
pending_reward_db.truncate()
|
|
|
|
def teardown_method(self):
|
|
"""Clean up test directories after each test."""
|
|
# Clean up any test user directories
|
|
for user_id in ['deletion_test_user', 'user_no_children', 'user_no_tasks']:
|
|
user_dir = get_user_image_dir(user_id)
|
|
if os.path.exists(user_dir):
|
|
try:
|
|
shutil.rmtree(user_dir)
|
|
except:
|
|
pass
|
|
|
|
def test_deletion_order_pending_rewards_first(self):
|
|
"""Test that pending rewards are deleted before children."""
|
|
user_id = 'deletion_test_user'
|
|
|
|
# Create user
|
|
user = User(
|
|
id=user_id,
|
|
email='delete@example.com',
|
|
first_name='Test',
|
|
last_name='User',
|
|
password='hash',
|
|
marked_for_deletion=True,
|
|
marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(),
|
|
deletion_in_progress=False,
|
|
deletion_attempted_at=None
|
|
)
|
|
users_db.insert(user.to_dict())
|
|
|
|
# Create child
|
|
child = Child(
|
|
id='child1',
|
|
name='Test Child',
|
|
user_id=user_id,
|
|
points=100,
|
|
tasks=[],
|
|
rewards=[]
|
|
)
|
|
child_db.insert(child.to_dict())
|
|
|
|
# Create pending reward
|
|
pending = PendingReward(
|
|
id='pending1',
|
|
child_id='child1',
|
|
reward_id='reward1',
|
|
user_id=user_id,
|
|
status='pending'
|
|
)
|
|
pending_reward_db.insert(pending.to_dict())
|
|
|
|
# Delete user
|
|
result = delete_user_data(user)
|
|
|
|
assert result is True
|
|
assert len(pending_reward_db.all()) == 0
|
|
assert len(child_db.all()) == 0
|
|
assert len(users_db.all()) == 0
|
|
|
|
def test_deletion_removes_user_tasks_not_system(self):
|
|
"""Test that only user's tasks are deleted, not system tasks."""
|
|
user_id = 'deletion_test_user'
|
|
|
|
# Create user
|
|
user = User(
|
|
id=user_id,
|
|
email='delete@example.com',
|
|
first_name='Test',
|
|
last_name='User',
|
|
password='hash',
|
|
marked_for_deletion=True,
|
|
marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(),
|
|
deletion_in_progress=False,
|
|
deletion_attempted_at=None
|
|
)
|
|
users_db.insert(user.to_dict())
|
|
|
|
# Create user task
|
|
user_task = Task(
|
|
id='user_task',
|
|
name='User Task',
|
|
points=10,
|
|
is_good=True,
|
|
user_id=user_id
|
|
)
|
|
task_db.insert(user_task.to_dict())
|
|
|
|
# Create system task
|
|
system_task = Task(
|
|
id='system_task',
|
|
name='System Task',
|
|
points=20,
|
|
is_good=True,
|
|
user_id=None
|
|
)
|
|
task_db.insert(system_task.to_dict())
|
|
|
|
# Delete user
|
|
result = delete_user_data(user)
|
|
|
|
assert result is True
|
|
# User task should be deleted
|
|
assert task_db.get(Query().id == 'user_task') is None
|
|
# System task should remain
|
|
assert task_db.get(Query().id == 'system_task') is not None
|
|
assert len(users_db.all()) == 0
|
|
|
|
def test_deletion_removes_user_rewards_not_system(self):
|
|
"""Test that only user's rewards are deleted, not system rewards."""
|
|
user_id = 'deletion_test_user'
|
|
|
|
# Create user
|
|
user = User(
|
|
id=user_id,
|
|
email='delete@example.com',
|
|
first_name='Test',
|
|
last_name='User',
|
|
password='hash',
|
|
marked_for_deletion=True,
|
|
marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(),
|
|
deletion_in_progress=False,
|
|
deletion_attempted_at=None
|
|
)
|
|
users_db.insert(user.to_dict())
|
|
|
|
# Create user reward
|
|
user_reward = Reward(
|
|
id='user_reward',
|
|
name='User Reward',
|
|
description='A user reward',
|
|
cost=50,
|
|
user_id=user_id
|
|
)
|
|
reward_db.insert(user_reward.to_dict())
|
|
|
|
# Create system reward
|
|
system_reward = Reward(
|
|
id='system_reward',
|
|
name='System Reward',
|
|
description='A system reward',
|
|
cost=100,
|
|
user_id=None
|
|
)
|
|
reward_db.insert(system_reward.to_dict())
|
|
|
|
# Delete user
|
|
result = delete_user_data(user)
|
|
|
|
assert result is True
|
|
# User reward should be deleted
|
|
assert reward_db.get(Query().id == 'user_reward') is None
|
|
# System reward should remain
|
|
assert reward_db.get(Query().id == 'system_reward') is not None
|
|
assert len(users_db.all()) == 0
|
|
|
|
def test_deletion_removes_user_images(self):
|
|
"""Test that user's images are deleted from database."""
|
|
user_id = 'deletion_test_user'
|
|
|
|
# Create user
|
|
user = User(
|
|
id=user_id,
|
|
email='delete@example.com',
|
|
first_name='Test',
|
|
last_name='User',
|
|
password='hash',
|
|
marked_for_deletion=True,
|
|
marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(),
|
|
deletion_in_progress=False,
|
|
deletion_attempted_at=None
|
|
)
|
|
users_db.insert(user.to_dict())
|
|
|
|
# Create user image
|
|
image = Image(
|
|
id='img1',
|
|
user_id=user_id,
|
|
type=1,
|
|
extension='jpg',
|
|
permanent=False
|
|
)
|
|
image_db.insert(image.to_dict())
|
|
|
|
# Delete user
|
|
result = delete_user_data(user)
|
|
|
|
assert result is True
|
|
assert len(image_db.search(Query().user_id == user_id)) == 0
|
|
assert len(users_db.all()) == 0
|
|
|
|
def test_deletion_with_user_no_children(self):
|
|
"""Test deletion of user with no children."""
|
|
user_id = 'user_no_children'
|
|
|
|
# Create user without children
|
|
user = User(
|
|
id=user_id,
|
|
email='nochildren@example.com',
|
|
first_name='Test',
|
|
last_name='User',
|
|
password='hash',
|
|
marked_for_deletion=True,
|
|
marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(),
|
|
deletion_in_progress=False,
|
|
deletion_attempted_at=None
|
|
)
|
|
users_db.insert(user.to_dict())
|
|
|
|
# Delete user
|
|
result = delete_user_data(user)
|
|
|
|
assert result is True
|
|
assert len(users_db.all()) == 0
|
|
|
|
def test_deletion_with_user_no_custom_tasks_or_rewards(self):
|
|
"""Test deletion of user with no custom tasks or rewards."""
|
|
user_id = 'user_no_tasks'
|
|
|
|
# Create user
|
|
user = User(
|
|
id=user_id,
|
|
email='notasks@example.com',
|
|
first_name='Test',
|
|
last_name='User',
|
|
password='hash',
|
|
marked_for_deletion=True,
|
|
marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(),
|
|
deletion_in_progress=False,
|
|
deletion_attempted_at=None
|
|
)
|
|
users_db.insert(user.to_dict())
|
|
|
|
# Delete user
|
|
result = delete_user_data(user)
|
|
|
|
assert result is True
|
|
assert len(users_db.all()) == 0
|
|
|
|
def test_deletion_handles_missing_directory(self):
|
|
"""Test that deletion continues if user directory doesn't exist."""
|
|
user_id = 'user_no_dir'
|
|
|
|
# Create user
|
|
user = User(
|
|
id=user_id,
|
|
email='nodir@example.com',
|
|
first_name='Test',
|
|
last_name='User',
|
|
password='hash',
|
|
marked_for_deletion=True,
|
|
marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(),
|
|
deletion_in_progress=False,
|
|
deletion_attempted_at=None
|
|
)
|
|
users_db.insert(user.to_dict())
|
|
|
|
# Ensure directory doesn't exist
|
|
user_dir = get_user_image_dir(user_id)
|
|
if os.path.exists(user_dir):
|
|
shutil.rmtree(user_dir)
|
|
|
|
# Delete user (should not fail)
|
|
result = delete_user_data(user)
|
|
|
|
assert result is True
|
|
assert len(users_db.all()) == 0
|
|
|
|
def test_deletion_in_progress_flag(self):
|
|
"""Test that deletion_in_progress flag is set during deletion."""
|
|
user_id = 'deletion_test_user'
|
|
|
|
# Create user
|
|
user = User(
|
|
id=user_id,
|
|
email='flag@example.com',
|
|
first_name='Test',
|
|
last_name='User',
|
|
password='hash',
|
|
marked_for_deletion=True,
|
|
marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(),
|
|
deletion_in_progress=False,
|
|
deletion_attempted_at=None
|
|
)
|
|
users_db.insert(user.to_dict())
|
|
|
|
# Mock the deletion to fail partway through
|
|
with patch('utils.account_deletion_scheduler.child_db.remove') as mock_remove:
|
|
mock_remove.side_effect = Exception("Test error")
|
|
|
|
result = delete_user_data(user)
|
|
|
|
assert result is False
|
|
# Check that flag was updated
|
|
Query_ = Query()
|
|
updated_user = users_db.get(Query_.id == user_id)
|
|
assert updated_user['deletion_in_progress'] is False
|
|
assert updated_user['deletion_attempted_at'] is not None
|
|
|
|
|
|
class TestRetryLogic:
|
|
"""Tests for deletion retry logic."""
|
|
|
|
def setup_method(self):
|
|
"""Clear test databases before each test."""
|
|
users_db.truncate()
|
|
child_db.truncate()
|
|
task_db.truncate()
|
|
reward_db.truncate()
|
|
image_db.truncate()
|
|
pending_reward_db.truncate()
|
|
|
|
def test_deletion_attempt_count(self):
|
|
"""Test that deletion attempt count is tracked."""
|
|
user_no_attempts = User(
|
|
id='user1',
|
|
email='test1@example.com',
|
|
first_name='Test',
|
|
last_name='User',
|
|
password='hash',
|
|
marked_for_deletion=True,
|
|
marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(),
|
|
deletion_in_progress=False,
|
|
deletion_attempted_at=None
|
|
)
|
|
assert get_deletion_attempt_count(user_no_attempts) == 0
|
|
|
|
user_one_attempt = User(
|
|
id='user2',
|
|
email='test2@example.com',
|
|
first_name='Test',
|
|
last_name='User',
|
|
password='hash',
|
|
marked_for_deletion=True,
|
|
marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(),
|
|
deletion_in_progress=False,
|
|
deletion_attempted_at=datetime.now().isoformat()
|
|
)
|
|
assert get_deletion_attempt_count(user_one_attempt) == 1
|
|
|
|
def test_max_deletion_attempts_constant(self):
|
|
"""Test that MAX_DELETION_ATTEMPTS is defined correctly."""
|
|
assert MAX_DELETION_ATTEMPTS == 3
|
|
|
|
def test_scheduler_interval_configuration(self):
|
|
"""Test that scheduler is configured to run every 1 hour."""
|
|
from utils.account_deletion_scheduler import start_deletion_scheduler, stop_deletion_scheduler, _scheduler
|
|
|
|
# Clean up any existing scheduler
|
|
stop_deletion_scheduler()
|
|
|
|
# Start the scheduler
|
|
start_deletion_scheduler()
|
|
|
|
# Get the scheduler instance
|
|
from utils import account_deletion_scheduler
|
|
scheduler = account_deletion_scheduler._scheduler
|
|
|
|
# Verify scheduler exists
|
|
assert scheduler is not None, "Scheduler should be initialized"
|
|
|
|
# Get all jobs
|
|
jobs = scheduler.get_jobs()
|
|
assert len(jobs) > 0, "Scheduler should have at least one job"
|
|
|
|
# Find the account deletion job
|
|
deletion_job = None
|
|
for job in jobs:
|
|
if job.id == 'account_deletion':
|
|
deletion_job = job
|
|
break
|
|
|
|
assert deletion_job is not None, "Account deletion job should exist"
|
|
|
|
# Verify the job is configured with interval trigger
|
|
assert hasattr(deletion_job.trigger, 'interval'), "Job should use interval trigger"
|
|
|
|
# Verify interval is 1 hour (3600 seconds)
|
|
interval_seconds = deletion_job.trigger.interval.total_seconds()
|
|
assert interval_seconds == 3600, f"Expected 3600 seconds (1 hour), got {interval_seconds}"
|
|
|
|
# Clean up
|
|
stop_deletion_scheduler()
|
|
|
|
|
|
class TestRestartHandling:
|
|
"""Tests for restart handling."""
|
|
|
|
def setup_method(self):
|
|
"""Clear test databases before each test."""
|
|
users_db.truncate()
|
|
child_db.truncate()
|
|
task_db.truncate()
|
|
reward_db.truncate()
|
|
image_db.truncate()
|
|
pending_reward_db.truncate()
|
|
|
|
def test_interrupted_deletion_recovery(self):
|
|
"""Test that interrupted deletions are detected on restart."""
|
|
# Create user with deletion_in_progress flag set
|
|
user = User(
|
|
id='interrupted_user',
|
|
email='interrupted@example.com',
|
|
first_name='Test',
|
|
last_name='User',
|
|
password='hash',
|
|
marked_for_deletion=True,
|
|
marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(),
|
|
deletion_in_progress=True,
|
|
deletion_attempted_at=None
|
|
)
|
|
users_db.insert(user.to_dict())
|
|
|
|
# Check for interrupted deletions
|
|
check_interrupted_deletions()
|
|
|
|
# Verify flag was cleared
|
|
Query_ = Query()
|
|
updated_user = users_db.get(Query_.id == 'interrupted_user')
|
|
assert updated_user['deletion_in_progress'] is False
|
|
|
|
def test_no_interrupted_deletions(self):
|
|
"""Test restart handling when no deletions were interrupted."""
|
|
# Create user without deletion_in_progress flag
|
|
user = User(
|
|
id='normal_user',
|
|
email='normal@example.com',
|
|
first_name='Test',
|
|
last_name='User',
|
|
password='hash',
|
|
marked_for_deletion=True,
|
|
marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(),
|
|
deletion_in_progress=False,
|
|
deletion_attempted_at=None
|
|
)
|
|
users_db.insert(user.to_dict())
|
|
|
|
# Should not raise any errors
|
|
check_interrupted_deletions()
|
|
|
|
|
|
class TestEdgeCases:
|
|
"""Tests for edge cases and error conditions."""
|
|
|
|
def setup_method(self):
|
|
"""Clear test databases before each test."""
|
|
users_db.truncate()
|
|
child_db.truncate()
|
|
task_db.truncate()
|
|
reward_db.truncate()
|
|
image_db.truncate()
|
|
pending_reward_db.truncate()
|
|
|
|
def test_concurrent_deletion_attempts(self):
|
|
"""Test that deletion_in_progress flag triggers a retry (interrupted deletion)."""
|
|
user_id = 'concurrent_user'
|
|
|
|
# Create user with deletion_in_progress already set (simulating an interrupted deletion)
|
|
user = User(
|
|
id=user_id,
|
|
email='concurrent@example.com',
|
|
first_name='Test',
|
|
last_name='User',
|
|
password='hash',
|
|
marked_for_deletion=True,
|
|
marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(),
|
|
deletion_in_progress=True,
|
|
deletion_attempted_at=None
|
|
)
|
|
users_db.insert(user.to_dict())
|
|
|
|
# Try to process deletion queue
|
|
process_deletion_queue()
|
|
|
|
# User should be deleted (scheduler retries interrupted deletions)
|
|
Query_ = Query()
|
|
remaining_user = users_db.get(Query_.id == user_id)
|
|
assert remaining_user is None
|
|
|
|
def test_partial_deletion_failure_continues_with_other_users(self):
|
|
"""Test that failure with one user doesn't stop processing others."""
|
|
# Create two users due for deletion
|
|
user1 = User(
|
|
id='user1',
|
|
email='user1@example.com',
|
|
first_name='Test',
|
|
last_name='User',
|
|
password='hash',
|
|
marked_for_deletion=True,
|
|
marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(),
|
|
deletion_in_progress=False,
|
|
deletion_attempted_at=None
|
|
)
|
|
users_db.insert(user1.to_dict())
|
|
|
|
user2 = User(
|
|
id='user2',
|
|
email='user2@example.com',
|
|
first_name='Test',
|
|
last_name='User',
|
|
password='hash',
|
|
marked_for_deletion=True,
|
|
marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(),
|
|
deletion_in_progress=False,
|
|
deletion_attempted_at=None
|
|
)
|
|
users_db.insert(user2.to_dict())
|
|
|
|
# Mock delete_user_data to fail for first user but succeed for second
|
|
original_delete = delete_user_data
|
|
call_count = [0]
|
|
|
|
def mock_delete(user):
|
|
call_count[0] += 1
|
|
if call_count[0] == 1:
|
|
# Fail first call
|
|
return False
|
|
else:
|
|
# Succeed on subsequent calls
|
|
return original_delete(user)
|
|
|
|
with patch('utils.account_deletion_scheduler.delete_user_data', side_effect=mock_delete):
|
|
process_deletion_queue()
|
|
|
|
# First user should remain (failed)
|
|
Query_ = Query()
|
|
assert users_db.get(Query_.id == 'user1') is not None
|
|
# Second user should be deleted (succeeded)
|
|
# Note: This depends on implementation - if delete_user_data is mocked completely,
|
|
# the user won't actually be removed. This test validates the flow continues.
|
|
|
|
def test_deletion_with_user_no_uploaded_images(self):
|
|
"""Test deletion of user with no uploaded images."""
|
|
user_id = 'user_no_images'
|
|
|
|
# Create user without any images
|
|
user = User(
|
|
id=user_id,
|
|
email='noimages@example.com',
|
|
first_name='Test',
|
|
last_name='User',
|
|
password='hash',
|
|
marked_for_deletion=True,
|
|
marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(),
|
|
deletion_in_progress=False,
|
|
deletion_attempted_at=None
|
|
)
|
|
users_db.insert(user.to_dict())
|
|
|
|
# Verify no images exist for this user
|
|
Query_ = Query()
|
|
assert len(image_db.search(Query_.user_id == user_id)) == 0
|
|
|
|
# Delete user (should succeed without errors)
|
|
result = delete_user_data(user)
|
|
|
|
assert result is True
|
|
assert len(users_db.all()) == 0
|
|
|
|
def test_user_with_max_failed_attempts(self, caplog):
|
|
"""Test that user with 3+ failed attempts logs critical and is not retried."""
|
|
import logging
|
|
caplog.set_level(logging.CRITICAL)
|
|
|
|
user_id = 'user_max_attempts'
|
|
|
|
# Create user with 3 failed attempts (simulated by having deletion_attempted_at set
|
|
# and mocking get_deletion_attempt_count to return 3)
|
|
user = User(
|
|
id=user_id,
|
|
email='maxattempts@example.com',
|
|
first_name='Test',
|
|
last_name='User',
|
|
password='hash',
|
|
marked_for_deletion=True,
|
|
marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(),
|
|
deletion_in_progress=False,
|
|
deletion_attempted_at=datetime.now().isoformat()
|
|
)
|
|
users_db.insert(user.to_dict())
|
|
|
|
# Mock get_deletion_attempt_count to return MAX_DELETION_ATTEMPTS
|
|
with patch('utils.account_deletion_scheduler.get_deletion_attempt_count', return_value=MAX_DELETION_ATTEMPTS):
|
|
process_deletion_queue()
|
|
|
|
# User should still exist (not deleted due to max attempts)
|
|
Query_ = Query()
|
|
remaining_user = users_db.get(Query_.id == user_id)
|
|
assert remaining_user is not None
|
|
|
|
# Check that critical log message was created
|
|
assert any(
|
|
'Manual intervention required' in record.message and user_id in record.message
|
|
for record in caplog.records
|
|
if record.levelno == logging.CRITICAL
|
|
)
|
|
|
|
|
|
class TestIntegration:
|
|
"""Integration tests for complete deletion workflows."""
|
|
|
|
def setup_method(self):
|
|
"""Clear test databases and filesystem before each test."""
|
|
users_db.truncate()
|
|
child_db.truncate()
|
|
task_db.truncate()
|
|
reward_db.truncate()
|
|
image_db.truncate()
|
|
pending_reward_db.truncate()
|
|
|
|
# Clean up test image directories
|
|
from config.paths import get_base_data_dir
|
|
test_image_base = os.path.join(get_base_data_dir(), 'images')
|
|
if os.path.exists(test_image_base):
|
|
for user_dir in os.listdir(test_image_base):
|
|
user_path = os.path.join(test_image_base, user_dir)
|
|
if os.path.isdir(user_path):
|
|
shutil.rmtree(user_path)
|
|
|
|
def teardown_method(self):
|
|
"""Clean up after tests."""
|
|
from config.paths import get_base_data_dir
|
|
test_image_base = os.path.join(get_base_data_dir(), 'images')
|
|
if os.path.exists(test_image_base):
|
|
for user_dir in os.listdir(test_image_base):
|
|
user_path = os.path.join(test_image_base, user_dir)
|
|
if os.path.isdir(user_path):
|
|
shutil.rmtree(user_path)
|
|
|
|
def test_full_deletion_flow_from_marking_to_deletion(self):
|
|
"""Test complete deletion flow from marked user with all associated data."""
|
|
user_id = 'integration_user_1'
|
|
child_id = 'integration_child_1'
|
|
task_id = 'integration_task_1'
|
|
reward_id = 'integration_reward_1'
|
|
image_id = 'integration_image_1'
|
|
pending_reward_id = 'integration_pending_1'
|
|
|
|
# 1. Create user marked for deletion (past threshold)
|
|
user = User(
|
|
id=user_id,
|
|
email='integration@example.com',
|
|
first_name='Integration',
|
|
last_name='Test',
|
|
password='hash',
|
|
marked_for_deletion=True,
|
|
marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(),
|
|
deletion_in_progress=False,
|
|
deletion_attempted_at=None
|
|
)
|
|
users_db.insert(user.to_dict())
|
|
|
|
# 2. Create child belonging to user
|
|
child = Child(
|
|
id=child_id,
|
|
user_id=user_id,
|
|
name='Test Child'
|
|
)
|
|
child_db.insert(child.to_dict())
|
|
|
|
# 3. Create pending reward for child
|
|
pending_reward = PendingReward(
|
|
id=pending_reward_id,
|
|
child_id=child_id,
|
|
reward_id='some_reward',
|
|
user_id=user_id,
|
|
status='pending'
|
|
)
|
|
pending_reward_db.insert(pending_reward.to_dict())
|
|
|
|
# 4. Create task created by user
|
|
task = Task(
|
|
id=task_id,
|
|
user_id=user_id,
|
|
name='User Task',
|
|
points=10,
|
|
is_good=True
|
|
)
|
|
task_db.insert(task.to_dict())
|
|
|
|
# 5. Create reward created by user
|
|
reward = Reward(
|
|
id=reward_id,
|
|
user_id=user_id,
|
|
name='User Reward',
|
|
description='Test reward',
|
|
cost=20
|
|
)
|
|
reward_db.insert(reward.to_dict())
|
|
|
|
# 6. Create image uploaded by user
|
|
image = Image(
|
|
id=image_id,
|
|
user_id=user_id,
|
|
type=1, # Type 1 for regular images
|
|
extension='jpg',
|
|
permanent=False
|
|
)
|
|
image_db.insert(image.to_dict())
|
|
|
|
# 7. Create user image directory with a file
|
|
user_image_dir = get_user_image_dir(user_id)
|
|
os.makedirs(user_image_dir, exist_ok=True)
|
|
test_image_path = os.path.join(user_image_dir, 'test.jpg')
|
|
with open(test_image_path, 'w') as f:
|
|
f.write('test image content')
|
|
|
|
# Verify everything exists before deletion
|
|
Query_ = Query()
|
|
assert users_db.get(Query_.id == user_id) is not None
|
|
assert child_db.get(Query_.id == child_id) is not None
|
|
assert pending_reward_db.get(Query_.id == pending_reward_id) is not None
|
|
assert task_db.get(Query_.user_id == user_id) is not None
|
|
assert reward_db.get(Query_.user_id == user_id) is not None
|
|
assert image_db.get(Query_.user_id == user_id) is not None
|
|
assert os.path.exists(user_image_dir)
|
|
assert os.path.exists(test_image_path)
|
|
|
|
# Run the deletion scheduler
|
|
process_deletion_queue()
|
|
|
|
# Verify everything is deleted
|
|
assert users_db.get(Query_.id == user_id) is None
|
|
assert child_db.get(Query_.id == child_id) is None
|
|
assert pending_reward_db.get(Query_.id == pending_reward_id) is None
|
|
assert task_db.get(Query_.user_id == user_id) is None
|
|
assert reward_db.get(Query_.user_id == user_id) is None
|
|
assert image_db.get(Query_.user_id == user_id) is None
|
|
assert not os.path.exists(user_image_dir)
|
|
|
|
def test_multiple_users_deleted_in_same_scheduler_run(self):
|
|
"""Test multiple users are deleted in a single scheduler run."""
|
|
user_ids = ['multi_user_1', 'multi_user_2', 'multi_user_3']
|
|
|
|
# Create 3 users all marked for deletion (past threshold)
|
|
for user_id in user_ids:
|
|
user = User(
|
|
id=user_id,
|
|
email=f'{user_id}@example.com',
|
|
first_name='Multi',
|
|
last_name='Test',
|
|
password='hash',
|
|
marked_for_deletion=True,
|
|
marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(),
|
|
deletion_in_progress=False,
|
|
deletion_attempted_at=None
|
|
)
|
|
users_db.insert(user.to_dict())
|
|
|
|
# Add a child for each user to verify deletion cascade
|
|
child = Child(
|
|
id=f'child_{user_id}',
|
|
user_id=user_id,
|
|
name=f'Child {user_id}'
|
|
)
|
|
child_db.insert(child.to_dict())
|
|
|
|
# Verify all users exist
|
|
Query_ = Query()
|
|
for user_id in user_ids:
|
|
assert users_db.get(Query_.id == user_id) is not None
|
|
assert child_db.get(Query_.user_id == user_id) is not None
|
|
|
|
# Run scheduler once
|
|
process_deletion_queue()
|
|
|
|
# Verify all users and their children are deleted
|
|
for user_id in user_ids:
|
|
assert users_db.get(Query_.id == user_id) is None
|
|
assert child_db.get(Query_.user_id == user_id) is None
|
|
|
|
def test_deletion_with_restart_midway_recovery(self):
|
|
"""Test deletion recovery when restart happens during deletion."""
|
|
user_id = 'restart_user'
|
|
child_id = 'restart_child'
|
|
|
|
# 1. Create user with deletion_in_progress=True (simulating interrupted deletion)
|
|
user = User(
|
|
id=user_id,
|
|
email='restart@example.com',
|
|
first_name='Restart',
|
|
last_name='Test',
|
|
password='hash',
|
|
marked_for_deletion=True,
|
|
marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(),
|
|
deletion_in_progress=True, # Interrupted state
|
|
deletion_attempted_at=(datetime.now() - timedelta(hours=2)).isoformat()
|
|
)
|
|
users_db.insert(user.to_dict())
|
|
|
|
# 2. Create associated data
|
|
child = Child(
|
|
id=child_id,
|
|
user_id=user_id,
|
|
name='Test Child'
|
|
)
|
|
child_db.insert(child.to_dict())
|
|
|
|
# 3. Create user image directory
|
|
user_image_dir = get_user_image_dir(user_id)
|
|
os.makedirs(user_image_dir, exist_ok=True)
|
|
test_image_path = os.path.join(user_image_dir, 'test.jpg')
|
|
with open(test_image_path, 'w') as f:
|
|
f.write('test content')
|
|
|
|
# Verify initial state
|
|
Query_ = Query()
|
|
assert users_db.get(Query_.id == user_id) is not None
|
|
assert users_db.get(Query_.id == user_id)['deletion_in_progress'] is True
|
|
|
|
# 4. Call check_interrupted_deletions (simulating app restart)
|
|
check_interrupted_deletions()
|
|
|
|
# Verify flag was cleared
|
|
updated_user = users_db.get(Query_.id == user_id)
|
|
assert updated_user is not None
|
|
assert updated_user['deletion_in_progress'] is False
|
|
|
|
# 5. Now run the scheduler to complete the deletion
|
|
process_deletion_queue()
|
|
|
|
# Verify user and all data are deleted
|
|
assert users_db.get(Query_.id == user_id) is None
|
|
assert child_db.get(Query_.id == child_id) is None
|
|
assert not os.path.exists(user_image_dir)
|
|
|
|
|
|
class TestManualDeletionTrigger:
|
|
"""Tests for manually triggered deletion (admin endpoint)."""
|
|
|
|
def setup_method(self):
|
|
"""Clear test databases before each test."""
|
|
users_db.truncate()
|
|
child_db.truncate()
|
|
task_db.truncate()
|
|
reward_db.truncate()
|
|
image_db.truncate()
|
|
pending_reward_db.truncate()
|
|
|
|
def teardown_method(self):
|
|
"""Clean up test directories after each test."""
|
|
for user_id in ['manual_user_1', 'manual_user_2', 'manual_user_3', 'manual_user_retry', 'recent_user']:
|
|
user_dir = get_user_image_dir(user_id)
|
|
if os.path.exists(user_dir):
|
|
try:
|
|
shutil.rmtree(user_dir)
|
|
except:
|
|
pass
|
|
|
|
def test_manual_trigger_deletes_immediately(self):
|
|
"""Test that manual trigger deletes users marked recently (not past threshold)."""
|
|
user_id = 'manual_user_1'
|
|
|
|
# Create user marked only 1 hour ago (well before 720 hour threshold)
|
|
marked_time = (datetime.now() - timedelta(hours=1)).isoformat()
|
|
user = User(
|
|
id=user_id,
|
|
email='manual1@example.com',
|
|
first_name='Manual',
|
|
last_name='Test',
|
|
password='hash',
|
|
marked_for_deletion=True,
|
|
marked_for_deletion_at=marked_time,
|
|
deletion_in_progress=False,
|
|
deletion_attempted_at=None
|
|
)
|
|
users_db.insert(user.to_dict())
|
|
|
|
# Verify user is NOT due for deletion under normal circumstances
|
|
assert is_user_due_for_deletion(user) is False
|
|
|
|
# Manually trigger deletion
|
|
result = trigger_deletion_manually()
|
|
|
|
# Verify user was deleted despite not being past threshold
|
|
Query_ = Query()
|
|
assert users_db.get(Query_.id == user_id) is None
|
|
assert result['triggered'] is True
|
|
|
|
def test_manual_trigger_deletes_multiple_users(self):
|
|
"""Test that manual trigger deletes all marked users regardless of time."""
|
|
# Create multiple users marked at different times
|
|
users_data = [
|
|
('manual_user_1', 1), # 1 hour ago
|
|
('manual_user_2', 100), # 100 hours ago
|
|
('manual_user_3', 800), # 800 hours ago (past threshold)
|
|
]
|
|
|
|
for user_id, hours_ago in users_data:
|
|
marked_time = (datetime.now() - timedelta(hours=hours_ago)).isoformat()
|
|
user = User(
|
|
id=user_id,
|
|
email=f'{user_id}@example.com',
|
|
first_name='Manual',
|
|
last_name='Test',
|
|
password='hash',
|
|
marked_for_deletion=True,
|
|
marked_for_deletion_at=marked_time,
|
|
deletion_in_progress=False,
|
|
deletion_attempted_at=None
|
|
)
|
|
users_db.insert(user.to_dict())
|
|
|
|
# Verify only one is due under normal circumstances
|
|
all_users = users_db.all()
|
|
due_count = sum(1 for u in all_users if is_user_due_for_deletion(User.from_dict(u)))
|
|
assert due_count == 1 # Only the 800 hour old one
|
|
|
|
# Manually trigger deletion
|
|
trigger_deletion_manually()
|
|
|
|
# Verify ALL marked users were deleted
|
|
Query_ = Query()
|
|
assert len(users_db.all()) == 0
|
|
|
|
def test_manual_trigger_respects_retry_limit(self):
|
|
"""Test that manual trigger still respects max retry limit."""
|
|
user_id = 'manual_user_retry'
|
|
|
|
# Create user marked recently with max attempts already
|
|
marked_time = (datetime.now() - timedelta(hours=1)).isoformat()
|
|
attempted_time = (datetime.now() - timedelta(hours=1)).isoformat()
|
|
|
|
user = User(
|
|
id=user_id,
|
|
email='retry@example.com',
|
|
first_name='Retry',
|
|
last_name='Test',
|
|
password='hash',
|
|
marked_for_deletion=True,
|
|
marked_for_deletion_at=marked_time,
|
|
deletion_in_progress=False,
|
|
deletion_attempted_at=attempted_time # Has 1 attempt
|
|
)
|
|
users_db.insert(user.to_dict())
|
|
|
|
# Mock delete_user_data to fail consistently
|
|
with patch('utils.account_deletion_scheduler.delete_user_data', return_value=False):
|
|
# Trigger multiple times to exceed retry limit
|
|
for _ in range(MAX_DELETION_ATTEMPTS):
|
|
trigger_deletion_manually()
|
|
|
|
# User should still exist after max attempts
|
|
Query_ = Query()
|
|
remaining_user = users_db.get(Query_.id == user_id)
|
|
assert remaining_user is not None
|
|
|
|
def test_manual_trigger_with_no_marked_users(self):
|
|
"""Test that manual trigger handles empty queue gracefully."""
|
|
result = trigger_deletion_manually()
|
|
|
|
assert result['triggered'] is True
|
|
assert result['queued_users'] == 0
|
|
|
|
def test_normal_scheduler_still_respects_threshold(self):
|
|
"""Test that normal scheduler run (force=False) still respects time threshold."""
|
|
user_id = 'recent_user'
|
|
|
|
# Create user marked only 1 hour ago
|
|
marked_time = (datetime.now() - timedelta(hours=1)).isoformat()
|
|
user = User(
|
|
id=user_id,
|
|
email='recent@example.com',
|
|
first_name='Recent',
|
|
last_name='Test',
|
|
password='hash',
|
|
marked_for_deletion=True,
|
|
marked_for_deletion_at=marked_time,
|
|
deletion_in_progress=False,
|
|
deletion_attempted_at=None
|
|
)
|
|
users_db.insert(user.to_dict())
|
|
|
|
# Run normal scheduler (not manual trigger)
|
|
process_deletion_queue(force=False)
|
|
|
|
# User should still exist because not past threshold
|
|
Query_ = Query()
|
|
assert users_db.get(Query_.id == user_id) is not None
|
|
|
|
# Now run with force=True
|
|
process_deletion_queue(force=True)
|
|
|
|
# User should be deleted
|
|
assert users_db.get(Query_.id == user_id) is None
|