import os import sys import pytest import shutil from datetime import datetime, timedelta from unittest.mock import patch, MagicMock # Set up path and environment before imports sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) os.environ['DB_ENV'] = 'test' from utils.account_deletion_scheduler import ( is_user_due_for_deletion, get_deletion_attempt_count, delete_user_data, process_deletion_queue, check_interrupted_deletions, trigger_deletion_manually, MAX_DELETION_ATTEMPTS ) from models.user import User from models.child import Child from models.task import Task from models.reward import Reward from models.image import Image from models.pending_reward import PendingReward from db.db import users_db, child_db, task_db, reward_db, image_db, pending_reward_db from config.paths import get_user_image_dir from tinydb import Query class TestSchedulerIdentification: """Tests for identifying users due for deletion.""" def setup_method(self): """Clear test databases before each test.""" users_db.truncate() child_db.truncate() task_db.truncate() reward_db.truncate() image_db.truncate() pending_reward_db.truncate() def test_user_due_for_deletion(self): """Test scheduler identifies users past the threshold.""" # Create user marked 800 hours ago (past 720 hour threshold) marked_time = (datetime.now() - timedelta(hours=800)).isoformat() user = User( id='user1', first_name='Test', last_name='User', email='test@example.com', password='hash', marked_for_deletion=True, marked_for_deletion_at=marked_time, deletion_in_progress=False, deletion_attempted_at=None ) assert is_user_due_for_deletion(user) is True def test_user_not_due_for_deletion(self): """Test scheduler ignores users not yet due.""" # Create user marked 100 hours ago (before 720 hour threshold) marked_time = (datetime.now() - timedelta(hours=100)).isoformat() user = User( id='user2', email='test2@example.com', first_name='Test', last_name='User', password='hash', marked_for_deletion=True, marked_for_deletion_at=marked_time, deletion_in_progress=False, deletion_attempted_at=None ) assert is_user_due_for_deletion(user) is False def test_user_not_marked_for_deletion(self): """Test scheduler ignores users not marked for deletion.""" user = User( id='user3', email='test3@example.com', first_name='Test', last_name='User', password='hash', marked_for_deletion=False, marked_for_deletion_at=None, deletion_in_progress=False, deletion_attempted_at=None ) assert is_user_due_for_deletion(user) is False def test_user_with_invalid_timestamp(self): """Test scheduler handles invalid timestamp gracefully.""" user = User( id='user4', email='test4@example.com', first_name='Test', last_name='User', password='hash', marked_for_deletion=True, marked_for_deletion_at='invalid-timestamp', deletion_in_progress=False, deletion_attempted_at=None ) assert is_user_due_for_deletion(user) is False def test_empty_database(self): """Test scheduler handles empty database gracefully.""" # Database is already empty from setup_method # Should not raise any errors process_deletion_queue() # Verify no users were deleted assert len(users_db.all()) == 0 class TestDeletionProcess: """Tests for the deletion process.""" def setup_method(self): """Clear test databases and create test data before each test.""" users_db.truncate() child_db.truncate() task_db.truncate() reward_db.truncate() image_db.truncate() pending_reward_db.truncate() def teardown_method(self): """Clean up test directories after each test.""" # Clean up any test user directories for user_id in ['deletion_test_user', 'user_no_children', 'user_no_tasks']: user_dir = get_user_image_dir(user_id) if os.path.exists(user_dir): try: shutil.rmtree(user_dir) except: pass def test_deletion_order_pending_rewards_first(self): """Test that pending rewards are deleted before children.""" user_id = 'deletion_test_user' # Create user user = User( id=user_id, email='delete@example.com', first_name='Test', last_name='User', password='hash', marked_for_deletion=True, marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(), deletion_in_progress=False, deletion_attempted_at=None ) users_db.insert(user.to_dict()) # Create child child = Child( id='child1', name='Test Child', user_id=user_id, points=100, tasks=[], rewards=[] ) child_db.insert(child.to_dict()) # Create pending reward pending = PendingReward( id='pending1', child_id='child1', reward_id='reward1', user_id=user_id, status='pending' ) pending_reward_db.insert(pending.to_dict()) # Delete user result = delete_user_data(user) assert result is True assert len(pending_reward_db.all()) == 0 assert len(child_db.all()) == 0 assert len(users_db.all()) == 0 def test_deletion_removes_user_tasks_not_system(self): """Test that only user's tasks are deleted, not system tasks.""" user_id = 'deletion_test_user' # Create user user = User( id=user_id, email='delete@example.com', first_name='Test', last_name='User', password='hash', marked_for_deletion=True, marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(), deletion_in_progress=False, deletion_attempted_at=None ) users_db.insert(user.to_dict()) # Create user task user_task = Task( id='user_task', name='User Task', points=10, type='chore', user_id=user_id ) task_db.insert(user_task.to_dict()) # Create system task system_task = Task( id='system_task', name='System Task', points=20, type='chore', user_id=None ) task_db.insert(system_task.to_dict()) # Delete user result = delete_user_data(user) assert result is True # User task should be deleted assert task_db.get(Query().id == 'user_task') is None # System task should remain assert task_db.get(Query().id == 'system_task') is not None assert len(users_db.all()) == 0 def test_deletion_removes_user_rewards_not_system(self): """Test that only user's rewards are deleted, not system rewards.""" user_id = 'deletion_test_user' # Create user user = User( id=user_id, email='delete@example.com', first_name='Test', last_name='User', password='hash', marked_for_deletion=True, marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(), deletion_in_progress=False, deletion_attempted_at=None ) users_db.insert(user.to_dict()) # Create user reward user_reward = Reward( id='user_reward', name='User Reward', description='A user reward', cost=50, user_id=user_id ) reward_db.insert(user_reward.to_dict()) # Create system reward system_reward = Reward( id='system_reward', name='System Reward', description='A system reward', cost=100, user_id=None ) reward_db.insert(system_reward.to_dict()) # Delete user result = delete_user_data(user) assert result is True # User reward should be deleted assert reward_db.get(Query().id == 'user_reward') is None # System reward should remain assert reward_db.get(Query().id == 'system_reward') is not None assert len(users_db.all()) == 0 def test_deletion_removes_user_images(self): """Test that user's images are deleted from database.""" user_id = 'deletion_test_user' # Create user user = User( id=user_id, email='delete@example.com', first_name='Test', last_name='User', password='hash', marked_for_deletion=True, marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(), deletion_in_progress=False, deletion_attempted_at=None ) users_db.insert(user.to_dict()) # Create user image image = Image( id='img1', user_id=user_id, type=1, extension='jpg', permanent=False ) image_db.insert(image.to_dict()) # Delete user result = delete_user_data(user) assert result is True assert len(image_db.search(Query().user_id == user_id)) == 0 assert len(users_db.all()) == 0 def test_deletion_with_user_no_children(self): """Test deletion of user with no children.""" user_id = 'user_no_children' # Create user without children user = User( id=user_id, email='nochildren@example.com', first_name='Test', last_name='User', password='hash', marked_for_deletion=True, marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(), deletion_in_progress=False, deletion_attempted_at=None ) users_db.insert(user.to_dict()) # Delete user result = delete_user_data(user) assert result is True assert len(users_db.all()) == 0 def test_deletion_with_user_no_custom_tasks_or_rewards(self): """Test deletion of user with no custom tasks or rewards.""" user_id = 'user_no_tasks' # Create user user = User( id=user_id, email='notasks@example.com', first_name='Test', last_name='User', password='hash', marked_for_deletion=True, marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(), deletion_in_progress=False, deletion_attempted_at=None ) users_db.insert(user.to_dict()) # Delete user result = delete_user_data(user) assert result is True assert len(users_db.all()) == 0 def test_deletion_handles_missing_directory(self): """Test that deletion continues if user directory doesn't exist.""" user_id = 'user_no_dir' # Create user user = User( id=user_id, email='nodir@example.com', first_name='Test', last_name='User', password='hash', marked_for_deletion=True, marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(), deletion_in_progress=False, deletion_attempted_at=None ) users_db.insert(user.to_dict()) # Ensure directory doesn't exist user_dir = get_user_image_dir(user_id) if os.path.exists(user_dir): shutil.rmtree(user_dir) # Delete user (should not fail) result = delete_user_data(user) assert result is True assert len(users_db.all()) == 0 def test_deletion_in_progress_flag(self): """Test that deletion_in_progress flag is set during deletion.""" user_id = 'deletion_test_user' # Create user user = User( id=user_id, email='flag@example.com', first_name='Test', last_name='User', password='hash', marked_for_deletion=True, marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(), deletion_in_progress=False, deletion_attempted_at=None ) users_db.insert(user.to_dict()) # Mock the deletion to fail partway through with patch('utils.account_deletion_scheduler.child_db.remove') as mock_remove: mock_remove.side_effect = Exception("Test error") result = delete_user_data(user) assert result is False # Check that flag was updated Query_ = Query() updated_user = users_db.get(Query_.id == user_id) assert updated_user['deletion_in_progress'] is False assert updated_user['deletion_attempted_at'] is not None class TestRetryLogic: """Tests for deletion retry logic.""" def setup_method(self): """Clear test databases before each test.""" users_db.truncate() child_db.truncate() task_db.truncate() reward_db.truncate() image_db.truncate() pending_reward_db.truncate() def test_deletion_attempt_count(self): """Test that deletion attempt count is tracked.""" user_no_attempts = User( id='user1', email='test1@example.com', first_name='Test', last_name='User', password='hash', marked_for_deletion=True, marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(), deletion_in_progress=False, deletion_attempted_at=None ) assert get_deletion_attempt_count(user_no_attempts) == 0 user_one_attempt = User( id='user2', email='test2@example.com', first_name='Test', last_name='User', password='hash', marked_for_deletion=True, marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(), deletion_in_progress=False, deletion_attempted_at=datetime.now().isoformat() ) assert get_deletion_attempt_count(user_one_attempt) == 1 def test_max_deletion_attempts_constant(self): """Test that MAX_DELETION_ATTEMPTS is defined correctly.""" assert MAX_DELETION_ATTEMPTS == 3 def test_scheduler_interval_configuration(self): """Test that scheduler is configured to run every 1 hour.""" from utils.account_deletion_scheduler import start_deletion_scheduler, stop_deletion_scheduler, _scheduler # Clean up any existing scheduler stop_deletion_scheduler() # Start the scheduler start_deletion_scheduler() # Get the scheduler instance from utils import account_deletion_scheduler scheduler = account_deletion_scheduler._scheduler # Verify scheduler exists assert scheduler is not None, "Scheduler should be initialized" # Get all jobs jobs = scheduler.get_jobs() assert len(jobs) > 0, "Scheduler should have at least one job" # Find the account deletion job deletion_job = None for job in jobs: if job.id == 'account_deletion': deletion_job = job break assert deletion_job is not None, "Account deletion job should exist" # Verify the job is configured with interval trigger assert hasattr(deletion_job.trigger, 'interval'), "Job should use interval trigger" # Verify interval is 1 hour (3600 seconds) interval_seconds = deletion_job.trigger.interval.total_seconds() assert interval_seconds == 3600, f"Expected 3600 seconds (1 hour), got {interval_seconds}" # Clean up stop_deletion_scheduler() class TestRestartHandling: """Tests for restart handling.""" def setup_method(self): """Clear test databases before each test.""" users_db.truncate() child_db.truncate() task_db.truncate() reward_db.truncate() image_db.truncate() pending_reward_db.truncate() def test_interrupted_deletion_recovery(self): """Test that interrupted deletions are detected on restart.""" # Create user with deletion_in_progress flag set user = User( id='interrupted_user', email='interrupted@example.com', first_name='Test', last_name='User', password='hash', marked_for_deletion=True, marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(), deletion_in_progress=True, deletion_attempted_at=None ) users_db.insert(user.to_dict()) # Check for interrupted deletions check_interrupted_deletions() # Verify flag was cleared Query_ = Query() updated_user = users_db.get(Query_.id == 'interrupted_user') assert updated_user['deletion_in_progress'] is False def test_no_interrupted_deletions(self): """Test restart handling when no deletions were interrupted.""" # Create user without deletion_in_progress flag user = User( id='normal_user', email='normal@example.com', first_name='Test', last_name='User', password='hash', marked_for_deletion=True, marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(), deletion_in_progress=False, deletion_attempted_at=None ) users_db.insert(user.to_dict()) # Should not raise any errors check_interrupted_deletions() class TestEdgeCases: """Tests for edge cases and error conditions.""" def setup_method(self): """Clear test databases before each test.""" users_db.truncate() child_db.truncate() task_db.truncate() reward_db.truncate() image_db.truncate() pending_reward_db.truncate() def test_concurrent_deletion_attempts(self): """Test that deletion_in_progress flag triggers a retry (interrupted deletion).""" user_id = 'concurrent_user' # Create user with deletion_in_progress already set (simulating an interrupted deletion) user = User( id=user_id, email='concurrent@example.com', first_name='Test', last_name='User', password='hash', marked_for_deletion=True, marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(), deletion_in_progress=True, deletion_attempted_at=None ) users_db.insert(user.to_dict()) # Try to process deletion queue process_deletion_queue() # User should be deleted (scheduler retries interrupted deletions) Query_ = Query() remaining_user = users_db.get(Query_.id == user_id) assert remaining_user is None def test_partial_deletion_failure_continues_with_other_users(self): """Test that failure with one user doesn't stop processing others.""" # Create two users due for deletion user1 = User( id='user1', email='user1@example.com', first_name='Test', last_name='User', password='hash', marked_for_deletion=True, marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(), deletion_in_progress=False, deletion_attempted_at=None ) users_db.insert(user1.to_dict()) user2 = User( id='user2', email='user2@example.com', first_name='Test', last_name='User', password='hash', marked_for_deletion=True, marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(), deletion_in_progress=False, deletion_attempted_at=None ) users_db.insert(user2.to_dict()) # Mock delete_user_data to fail for first user but succeed for second original_delete = delete_user_data call_count = [0] def mock_delete(user): call_count[0] += 1 if call_count[0] == 1: # Fail first call return False else: # Succeed on subsequent calls return original_delete(user) with patch('utils.account_deletion_scheduler.delete_user_data', side_effect=mock_delete): process_deletion_queue() # First user should remain (failed) Query_ = Query() assert users_db.get(Query_.id == 'user1') is not None # Second user should be deleted (succeeded) # Note: This depends on implementation - if delete_user_data is mocked completely, # the user won't actually be removed. This test validates the flow continues. def test_deletion_with_user_no_uploaded_images(self): """Test deletion of user with no uploaded images.""" user_id = 'user_no_images' # Create user without any images user = User( id=user_id, email='noimages@example.com', first_name='Test', last_name='User', password='hash', marked_for_deletion=True, marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(), deletion_in_progress=False, deletion_attempted_at=None ) users_db.insert(user.to_dict()) # Verify no images exist for this user Query_ = Query() assert len(image_db.search(Query_.user_id == user_id)) == 0 # Delete user (should succeed without errors) result = delete_user_data(user) assert result is True assert len(users_db.all()) == 0 def test_user_with_max_failed_attempts(self, caplog): """Test that user with 3+ failed attempts logs critical and is not retried.""" import logging caplog.set_level(logging.CRITICAL) user_id = 'user_max_attempts' # Create user with 3 failed attempts (simulated by having deletion_attempted_at set # and mocking get_deletion_attempt_count to return 3) user = User( id=user_id, email='maxattempts@example.com', first_name='Test', last_name='User', password='hash', marked_for_deletion=True, marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(), deletion_in_progress=False, deletion_attempted_at=datetime.now().isoformat() ) users_db.insert(user.to_dict()) # Mock get_deletion_attempt_count to return MAX_DELETION_ATTEMPTS with patch('utils.account_deletion_scheduler.get_deletion_attempt_count', return_value=MAX_DELETION_ATTEMPTS): process_deletion_queue() # User should still exist (not deleted due to max attempts) Query_ = Query() remaining_user = users_db.get(Query_.id == user_id) assert remaining_user is not None # Check that critical log message was created assert any( 'Manual intervention required' in record.message and user_id in record.message for record in caplog.records if record.levelno == logging.CRITICAL ) class TestIntegration: """Integration tests for complete deletion workflows.""" def setup_method(self): """Clear test databases and filesystem before each test.""" users_db.truncate() child_db.truncate() task_db.truncate() reward_db.truncate() image_db.truncate() pending_reward_db.truncate() # Clean up test image directories from config.paths import get_base_data_dir test_image_base = os.path.join(get_base_data_dir(), 'images') if os.path.exists(test_image_base): for user_dir in os.listdir(test_image_base): user_path = os.path.join(test_image_base, user_dir) if os.path.isdir(user_path): shutil.rmtree(user_path) def teardown_method(self): """Clean up after tests.""" from config.paths import get_base_data_dir test_image_base = os.path.join(get_base_data_dir(), 'images') if os.path.exists(test_image_base): for user_dir in os.listdir(test_image_base): user_path = os.path.join(test_image_base, user_dir) if os.path.isdir(user_path): shutil.rmtree(user_path) def test_full_deletion_flow_from_marking_to_deletion(self): """Test complete deletion flow from marked user with all associated data.""" user_id = 'integration_user_1' child_id = 'integration_child_1' task_id = 'integration_task_1' reward_id = 'integration_reward_1' image_id = 'integration_image_1' pending_reward_id = 'integration_pending_1' # 1. Create user marked for deletion (past threshold) user = User( id=user_id, email='integration@example.com', first_name='Integration', last_name='Test', password='hash', marked_for_deletion=True, marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(), deletion_in_progress=False, deletion_attempted_at=None ) users_db.insert(user.to_dict()) # 2. Create child belonging to user child = Child( id=child_id, user_id=user_id, name='Test Child' ) child_db.insert(child.to_dict()) # 3. Create pending reward for child pending_reward = PendingReward( id=pending_reward_id, child_id=child_id, reward_id='some_reward', user_id=user_id, status='pending' ) pending_reward_db.insert(pending_reward.to_dict()) # 4. Create task created by user task = Task( id=task_id, user_id=user_id, name='User Task', points=10, type='chore' ) task_db.insert(task.to_dict()) # 5. Create reward created by user reward = Reward( id=reward_id, user_id=user_id, name='User Reward', description='Test reward', cost=20 ) reward_db.insert(reward.to_dict()) # 6. Create image uploaded by user image = Image( id=image_id, user_id=user_id, type=1, # Type 1 for regular images extension='jpg', permanent=False ) image_db.insert(image.to_dict()) # 7. Create user image directory with a file user_image_dir = get_user_image_dir(user_id) os.makedirs(user_image_dir, exist_ok=True) test_image_path = os.path.join(user_image_dir, 'test.jpg') with open(test_image_path, 'w') as f: f.write('test image content') # Verify everything exists before deletion Query_ = Query() assert users_db.get(Query_.id == user_id) is not None assert child_db.get(Query_.id == child_id) is not None assert pending_reward_db.get(Query_.id == pending_reward_id) is not None assert task_db.get(Query_.user_id == user_id) is not None assert reward_db.get(Query_.user_id == user_id) is not None assert image_db.get(Query_.user_id == user_id) is not None assert os.path.exists(user_image_dir) assert os.path.exists(test_image_path) # Run the deletion scheduler process_deletion_queue() # Verify everything is deleted assert users_db.get(Query_.id == user_id) is None assert child_db.get(Query_.id == child_id) is None assert pending_reward_db.get(Query_.id == pending_reward_id) is None assert task_db.get(Query_.user_id == user_id) is None assert reward_db.get(Query_.user_id == user_id) is None assert image_db.get(Query_.user_id == user_id) is None assert not os.path.exists(user_image_dir) def test_multiple_users_deleted_in_same_scheduler_run(self): """Test multiple users are deleted in a single scheduler run.""" user_ids = ['multi_user_1', 'multi_user_2', 'multi_user_3'] # Create 3 users all marked for deletion (past threshold) for user_id in user_ids: user = User( id=user_id, email=f'{user_id}@example.com', first_name='Multi', last_name='Test', password='hash', marked_for_deletion=True, marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(), deletion_in_progress=False, deletion_attempted_at=None ) users_db.insert(user.to_dict()) # Add a child for each user to verify deletion cascade child = Child( id=f'child_{user_id}', user_id=user_id, name=f'Child {user_id}' ) child_db.insert(child.to_dict()) # Verify all users exist Query_ = Query() for user_id in user_ids: assert users_db.get(Query_.id == user_id) is not None assert child_db.get(Query_.user_id == user_id) is not None # Run scheduler once process_deletion_queue() # Verify all users and their children are deleted for user_id in user_ids: assert users_db.get(Query_.id == user_id) is None assert child_db.get(Query_.user_id == user_id) is None def test_deletion_with_restart_midway_recovery(self): """Test deletion recovery when restart happens during deletion.""" user_id = 'restart_user' child_id = 'restart_child' # 1. Create user with deletion_in_progress=True (simulating interrupted deletion) user = User( id=user_id, email='restart@example.com', first_name='Restart', last_name='Test', password='hash', marked_for_deletion=True, marked_for_deletion_at=(datetime.now() - timedelta(hours=800)).isoformat(), deletion_in_progress=True, # Interrupted state deletion_attempted_at=(datetime.now() - timedelta(hours=2)).isoformat() ) users_db.insert(user.to_dict()) # 2. Create associated data child = Child( id=child_id, user_id=user_id, name='Test Child' ) child_db.insert(child.to_dict()) # 3. Create user image directory user_image_dir = get_user_image_dir(user_id) os.makedirs(user_image_dir, exist_ok=True) test_image_path = os.path.join(user_image_dir, 'test.jpg') with open(test_image_path, 'w') as f: f.write('test content') # Verify initial state Query_ = Query() assert users_db.get(Query_.id == user_id) is not None assert users_db.get(Query_.id == user_id)['deletion_in_progress'] is True # 4. Call check_interrupted_deletions (simulating app restart) check_interrupted_deletions() # Verify flag was cleared updated_user = users_db.get(Query_.id == user_id) assert updated_user is not None assert updated_user['deletion_in_progress'] is False # 5. Now run the scheduler to complete the deletion process_deletion_queue() # Verify user and all data are deleted assert users_db.get(Query_.id == user_id) is None assert child_db.get(Query_.id == child_id) is None assert not os.path.exists(user_image_dir) class TestManualDeletionTrigger: """Tests for manually triggered deletion (admin endpoint).""" def setup_method(self): """Clear test databases before each test.""" users_db.truncate() child_db.truncate() task_db.truncate() reward_db.truncate() image_db.truncate() pending_reward_db.truncate() def teardown_method(self): """Clean up test directories after each test.""" for user_id in ['manual_user_1', 'manual_user_2', 'manual_user_3', 'manual_user_retry', 'recent_user']: user_dir = get_user_image_dir(user_id) if os.path.exists(user_dir): try: shutil.rmtree(user_dir) except: pass def test_manual_trigger_deletes_immediately(self): """Test that manual trigger deletes users marked recently (not past threshold).""" user_id = 'manual_user_1' # Create user marked only 1 hour ago (well before 720 hour threshold) marked_time = (datetime.now() - timedelta(hours=1)).isoformat() user = User( id=user_id, email='manual1@example.com', first_name='Manual', last_name='Test', password='hash', marked_for_deletion=True, marked_for_deletion_at=marked_time, deletion_in_progress=False, deletion_attempted_at=None ) users_db.insert(user.to_dict()) # Verify user is NOT due for deletion under normal circumstances assert is_user_due_for_deletion(user) is False # Manually trigger deletion result = trigger_deletion_manually() # Verify user was deleted despite not being past threshold Query_ = Query() assert users_db.get(Query_.id == user_id) is None assert result['triggered'] is True def test_manual_trigger_deletes_multiple_users(self): """Test that manual trigger deletes all marked users regardless of time.""" # Create multiple users marked at different times users_data = [ ('manual_user_1', 1), # 1 hour ago ('manual_user_2', 100), # 100 hours ago ('manual_user_3', 800), # 800 hours ago (past threshold) ] for user_id, hours_ago in users_data: marked_time = (datetime.now() - timedelta(hours=hours_ago)).isoformat() user = User( id=user_id, email=f'{user_id}@example.com', first_name='Manual', last_name='Test', password='hash', marked_for_deletion=True, marked_for_deletion_at=marked_time, deletion_in_progress=False, deletion_attempted_at=None ) users_db.insert(user.to_dict()) # Verify only one is due under normal circumstances all_users = users_db.all() due_count = sum(1 for u in all_users if is_user_due_for_deletion(User.from_dict(u))) assert due_count == 1 # Only the 800 hour old one # Manually trigger deletion trigger_deletion_manually() # Verify ALL marked users were deleted Query_ = Query() assert len(users_db.all()) == 0 def test_manual_trigger_respects_retry_limit(self): """Test that manual trigger still respects max retry limit.""" user_id = 'manual_user_retry' # Create user marked recently with max attempts already marked_time = (datetime.now() - timedelta(hours=1)).isoformat() attempted_time = (datetime.now() - timedelta(hours=1)).isoformat() user = User( id=user_id, email='retry@example.com', first_name='Retry', last_name='Test', password='hash', marked_for_deletion=True, marked_for_deletion_at=marked_time, deletion_in_progress=False, deletion_attempted_at=attempted_time # Has 1 attempt ) users_db.insert(user.to_dict()) # Mock delete_user_data to fail consistently with patch('utils.account_deletion_scheduler.delete_user_data', return_value=False): # Trigger multiple times to exceed retry limit for _ in range(MAX_DELETION_ATTEMPTS): trigger_deletion_manually() # User should still exist after max attempts Query_ = Query() remaining_user = users_db.get(Query_.id == user_id) assert remaining_user is not None def test_manual_trigger_with_no_marked_users(self): """Test that manual trigger handles empty queue gracefully.""" result = trigger_deletion_manually() assert result['triggered'] is True assert result['queued_users'] == 0 def test_normal_scheduler_still_respects_threshold(self): """Test that normal scheduler run (force=False) still respects time threshold.""" user_id = 'recent_user' # Create user marked only 1 hour ago marked_time = (datetime.now() - timedelta(hours=1)).isoformat() user = User( id=user_id, email='recent@example.com', first_name='Recent', last_name='Test', password='hash', marked_for_deletion=True, marked_for_deletion_at=marked_time, deletion_in_progress=False, deletion_attempted_at=None ) users_db.insert(user.to_dict()) # Run normal scheduler (not manual trigger) process_deletion_queue(force=False) # User should still exist because not past threshold Query_ = Query() assert users_db.get(Query_.id == user_id) is not None # Now run with force=True process_deletion_queue(force=True) # User should be deleted assert users_db.get(Query_.id == user_id) is None