import logging import os import shutil from datetime import datetime, timedelta from logging.handlers import RotatingFileHandler from apscheduler.schedulers.background import BackgroundScheduler from tinydb import Query from config.deletion_config import ACCOUNT_DELETION_THRESHOLD_HOURS from config.paths import get_user_image_dir from db.db import users_db, child_db, task_db, reward_db, image_db, pending_reward_db from models.user import User from events.types.event import Event from events.types.event_types import EventType from events.types.user_deleted import UserDeleted from events.sse import send_to_user # Setup dedicated logger for account deletion logger = logging.getLogger('account_deletion_scheduler') logger.setLevel(logging.INFO) # Create logs directory if it doesn't exist os.makedirs('logs', exist_ok=True) # Add rotating file handler file_handler = RotatingFileHandler( 'logs/account_deletion.log', maxBytes=10*1024*1024, # 10MB backupCount=5 ) file_handler.setFormatter( logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') ) logger.addHandler(file_handler) # Also log to stdout console_handler = logging.StreamHandler() console_handler.setFormatter( logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') ) logger.addHandler(console_handler) MAX_DELETION_ATTEMPTS = 3 def send_user_deleted_event_to_admins(user_id: str, email: str, deleted_at: str): """ Send USER_DELETED event to all admin users. TODO: Currently sends to all authenticated users with active SSE connections. In production, this should filter to only users with admin role. """ event = Event( EventType.USER_DELETED.value, UserDeleted(user_id, email, deleted_at) ) # TODO: Get list of admin users and send only to them # For now, we'll skip broadcasting since we don't have a way to get all active admin connections # This will need to be implemented when admin role system is in place logger.info(f"USER_DELETED event created for {user_id} ({email}) at {deleted_at}") # Future implementation: # admin_users = get_admin_users() # for admin in admin_users: # send_to_user(admin.id, event.to_dict()) def is_user_due_for_deletion(user: User) -> bool: """ Check if a user is due for deletion based on marked_for_deletion_at timestamp and the configured threshold. """ if not user.marked_for_deletion or not user.marked_for_deletion_at: return False try: marked_at = datetime.fromisoformat(user.marked_for_deletion_at) threshold_delta = timedelta(hours=ACCOUNT_DELETION_THRESHOLD_HOURS) due_at = marked_at + threshold_delta # Get current time - make it timezone-aware if marked_at is timezone-aware now = datetime.now() if marked_at.tzinfo is not None: # Convert marked_at to naive UTC for comparison marked_at = marked_at.replace(tzinfo=None) due_at = marked_at + threshold_delta return now >= due_at except (ValueError, TypeError) as e: logger.error(f"Error parsing marked_for_deletion_at for user {user.id}: {e}") return False def get_deletion_attempt_count(user: User) -> int: """ Calculate the number of deletion attempts based on deletion_attempted_at. This is a simplified version - in practice, you might track attempts differently. """ # For now, we'll consider any user with deletion_attempted_at as having 1 attempt # In a more robust system, you'd track this in a separate field or table if user.deletion_attempted_at: return 1 return 0 def delete_user_data(user: User) -> bool: """ Delete all data associated with a user in the correct order. Returns True if successful, False otherwise. """ user_id = user.id success = True try: # Step 1: Set deletion_in_progress flag logger.info(f"Starting deletion for user {user_id} ({user.email})") Query_ = Query() users_db.update({'deletion_in_progress': True}, Query_.id == user_id) # Step 2: Remove pending rewards for user's children try: children = child_db.search(Query_.user_id == user_id) child_ids = [child['id'] for child in children] if child_ids: for child_id in child_ids: removed = pending_reward_db.remove(Query_.child_id == child_id) if removed: logger.info(f"Deleted {len(removed)} pending rewards for child {child_id}") except Exception as e: logger.error(f"Failed to delete pending rewards for user {user_id}: {e}") success = False # Step 3: Remove children try: removed = child_db.remove(Query_.user_id == user_id) if removed: logger.info(f"Deleted {len(removed)} children for user {user_id}") except Exception as e: logger.error(f"Failed to delete children for user {user_id}: {e}") success = False # Step 4: Remove user-created tasks try: removed = task_db.remove(Query_.user_id == user_id) if removed: logger.info(f"Deleted {len(removed)} tasks for user {user_id}") except Exception as e: logger.error(f"Failed to delete tasks for user {user_id}: {e}") success = False # Step 5: Remove user-created rewards try: removed = reward_db.remove(Query_.user_id == user_id) if removed: logger.info(f"Deleted {len(removed)} rewards for user {user_id}") except Exception as e: logger.error(f"Failed to delete rewards for user {user_id}: {e}") success = False # Step 6: Remove user's images from database try: removed = image_db.remove(Query_.user_id == user_id) if removed: logger.info(f"Deleted {len(removed)} images from database for user {user_id}") except Exception as e: logger.error(f"Failed to delete images from database for user {user_id}: {e}") success = False # Step 7: Delete user's image directory from filesystem try: user_image_dir = get_user_image_dir(user_id) if os.path.exists(user_image_dir): shutil.rmtree(user_image_dir) logger.info(f"Deleted image directory for user {user_id}") else: logger.info(f"Image directory for user {user_id} does not exist (already deleted or never created)") except Exception as e: logger.error(f"Failed to delete image directory for user {user_id}: {e}") success = False # Step 8: Remove user record if success: try: users_db.remove(Query_.id == user_id) deleted_at = datetime.now().isoformat() logger.info(f"Successfully deleted user {user_id} ({user.email})") # Send USER_DELETED event to admin users send_user_deleted_event_to_admins(user_id, user.email, deleted_at) return True except Exception as e: logger.error(f"Failed to delete user record for {user_id}: {e}") return False else: # Deletion failed, update flags logger.error(f"Deletion failed for user {user_id}, marking for retry") users_db.update({ 'deletion_in_progress': False, 'deletion_attempted_at': datetime.now().isoformat() }, Query_.id == user_id) return False except Exception as e: logger.error(f"Unexpected error during deletion for user {user_id}: {e}") # Try to clear the in_progress flag try: users_db.update({ 'deletion_in_progress': False, 'deletion_attempted_at': datetime.now().isoformat() }, Query_.id == user_id) except: pass return False def process_deletion_queue(force=False): """ Process the deletion queue: find users due for deletion and delete them. Args: force (bool): If True, delete all marked users immediately without checking threshold. If False, only delete users past the threshold time. """ if force: logger.info("Starting FORCED deletion scheduler run (bypassing time threshold)") else: logger.info("Starting deletion scheduler run") processed = 0 deleted = 0 failed = 0 try: # Get all marked users Query_ = Query() marked_users = users_db.search(Query_.marked_for_deletion == True) if not marked_users: logger.info("No users marked for deletion") return logger.info(f"Found {len(marked_users)} users marked for deletion") for user_dict in marked_users: user = User.from_dict(user_dict) processed += 1 # Check if user is due for deletion (skip check if force=True) if not force and not is_user_due_for_deletion(user): continue # Check retry limit attempt_count = get_deletion_attempt_count(user) if attempt_count >= MAX_DELETION_ATTEMPTS: logger.critical( f"User {user.id} ({user.email}) has failed deletion {attempt_count} times. " "Manual intervention required." ) continue # Skip if deletion is already in progress (from a previous run) if user.deletion_in_progress: logger.warning( f"User {user.id} ({user.email}) has deletion_in_progress=True. " "This may indicate a previous run was interrupted. Retrying..." ) # Attempt deletion if delete_user_data(user): deleted += 1 else: failed += 1 logger.info( f"Deletion scheduler run complete: " f"{processed} users processed, {deleted} deleted, {failed} failed" ) except Exception as e: logger.error(f"Error in deletion scheduler: {e}") def check_interrupted_deletions(): """ On startup, check for users with deletion_in_progress=True and retry their deletion. """ logger.info("Checking for interrupted deletions from previous runs") try: Query_ = Query() interrupted_users = users_db.search( (Query_.marked_for_deletion == True) & (Query_.deletion_in_progress == True) ) if interrupted_users: logger.warning( f"Found {len(interrupted_users)} users with interrupted deletions. " "Will retry on next scheduler run." ) # Reset the flag so they can be retried for user_dict in interrupted_users: users_db.update( {'deletion_in_progress': False}, Query_.id == user_dict['id'] ) except Exception as e: logger.error(f"Error checking for interrupted deletions: {e}") # Global scheduler instance _scheduler = None def start_deletion_scheduler(): """ Start the background deletion scheduler. Should be called once during application startup. """ global _scheduler if _scheduler is not None: logger.warning("Deletion scheduler is already running") return logger.info("Starting account deletion scheduler") # Check for interrupted deletions from previous runs check_interrupted_deletions() # Create and start scheduler _scheduler = BackgroundScheduler() # Run every hour _scheduler.add_job( process_deletion_queue, 'interval', hours=1, id='account_deletion', name='Account Deletion Scheduler', replace_existing=True ) _scheduler.start() logger.info("Account deletion scheduler started (runs every 1 hour)") def stop_deletion_scheduler(): """ Stop the deletion scheduler (for testing or shutdown). """ global _scheduler if _scheduler is not None: _scheduler.shutdown() _scheduler = None logger.info("Account deletion scheduler stopped") def trigger_deletion_manually(): """ Manually trigger the deletion process (for admin use). Deletes all marked users immediately without waiting for threshold. Returns stats about the run. """ logger.info("Manual deletion trigger requested - forcing immediate deletion") process_deletion_queue(force=True) # Return stats (simplified version) Query_ = Query() marked_users = users_db.search(Query_.marked_for_deletion == True) return { 'triggered': True, 'queued_users': len(marked_users) }