Files
chore/backend/utils/account_deletion_scheduler.py
Ryan Kegel 7e7a2ef49e
Some checks failed
Chore App Build and Push Docker Images / build-and-push (push) Has been cancelled
Implement account deletion handling and improve user feedback
- Added checks for accounts marked for deletion in signup, verification, and password reset processes.
- Updated reward and task listing to sort user-created items first.
- Enhanced user API to clear verification and reset tokens when marking accounts for deletion.
- Introduced tests for marked accounts to ensure proper handling in various scenarios.
- Updated profile and reward edit components to reflect changes in validation and data handling.
2026-02-17 10:38:26 -05:00

369 lines
13 KiB
Python

import logging
import os
import shutil
from datetime import datetime, timedelta
from logging.handlers import RotatingFileHandler
from apscheduler.schedulers.background import BackgroundScheduler
from tinydb import Query
from config.deletion_config import ACCOUNT_DELETION_THRESHOLD_HOURS
from config.paths import get_user_image_dir
from db.db import users_db, child_db, task_db, reward_db, image_db, pending_reward_db
from models.user import User
from events.types.event import Event
from events.types.event_types import EventType
from events.types.user_deleted import UserDeleted
from events.sse import send_to_user
# Setup dedicated logger for account deletion
logger = logging.getLogger('account_deletion_scheduler')
logger.setLevel(logging.INFO)
# Create logs directory if it doesn't exist
os.makedirs('logs', exist_ok=True)
# Add rotating file handler
file_handler = RotatingFileHandler(
'logs/account_deletion.log',
maxBytes=10*1024*1024, # 10MB
backupCount=5
)
file_handler.setFormatter(
logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
)
logger.addHandler(file_handler)
# Also log to stdout
console_handler = logging.StreamHandler()
console_handler.setFormatter(
logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
)
logger.addHandler(console_handler)
MAX_DELETION_ATTEMPTS = 3
def send_user_deleted_event_to_admins(user_id: str, email: str, deleted_at: str):
"""
Send USER_DELETED event to all admin users.
TODO: Currently sends to all authenticated users with active SSE connections.
In production, this should filter to only users with admin role.
"""
event = Event(
EventType.USER_DELETED.value,
UserDeleted(user_id, email, deleted_at)
)
# TODO: Get list of admin users and send only to them
# For now, we'll skip broadcasting since we don't have a way to get all active admin connections
# This will need to be implemented when admin role system is in place
logger.info(f"USER_DELETED event created for {user_id} ({email}) at {deleted_at}")
# Future implementation:
# admin_users = get_admin_users()
# for admin in admin_users:
# send_to_user(admin.id, event.to_dict())
def is_user_due_for_deletion(user: User) -> bool:
"""
Check if a user is due for deletion based on marked_for_deletion_at timestamp
and the configured threshold.
"""
if not user.marked_for_deletion or not user.marked_for_deletion_at:
return False
try:
marked_at = datetime.fromisoformat(user.marked_for_deletion_at)
threshold_delta = timedelta(hours=ACCOUNT_DELETION_THRESHOLD_HOURS)
due_at = marked_at + threshold_delta
# Get current time - make it timezone-aware if marked_at is timezone-aware
now = datetime.now()
if marked_at.tzinfo is not None:
# Convert marked_at to naive UTC for comparison
marked_at = marked_at.replace(tzinfo=None)
due_at = marked_at + threshold_delta
return now >= due_at
except (ValueError, TypeError) as e:
logger.error(f"Error parsing marked_for_deletion_at for user {user.id}: {e}")
return False
def get_deletion_attempt_count(user: User) -> int:
"""
Calculate the number of deletion attempts based on deletion_attempted_at.
This is a simplified version - in practice, you might track attempts differently.
"""
# For now, we'll consider any user with deletion_attempted_at as having 1 attempt
# In a more robust system, you'd track this in a separate field or table
if user.deletion_attempted_at:
return 1
return 0
def delete_user_data(user: User) -> bool:
"""
Delete all data associated with a user in the correct order.
Returns True if successful, False otherwise.
"""
user_id = user.id
success = True
try:
# Step 1: Set deletion_in_progress flag
logger.info(f"Starting deletion for user {user_id} ({user.email})")
Query_ = Query()
users_db.update({'deletion_in_progress': True}, Query_.id == user_id)
# Step 2: Remove pending rewards for user's children
try:
children = child_db.search(Query_.user_id == user_id)
child_ids = [child['id'] for child in children]
if child_ids:
for child_id in child_ids:
removed = pending_reward_db.remove(Query_.child_id == child_id)
if removed:
logger.info(f"Deleted {len(removed)} pending rewards for child {child_id}")
except Exception as e:
logger.error(f"Failed to delete pending rewards for user {user_id}: {e}")
success = False
# Step 3: Remove children
try:
removed = child_db.remove(Query_.user_id == user_id)
if removed:
logger.info(f"Deleted {len(removed)} children for user {user_id}")
except Exception as e:
logger.error(f"Failed to delete children for user {user_id}: {e}")
success = False
# Step 4: Remove user-created tasks
try:
removed = task_db.remove(Query_.user_id == user_id)
if removed:
logger.info(f"Deleted {len(removed)} tasks for user {user_id}")
except Exception as e:
logger.error(f"Failed to delete tasks for user {user_id}: {e}")
success = False
# Step 5: Remove user-created rewards
try:
removed = reward_db.remove(Query_.user_id == user_id)
if removed:
logger.info(f"Deleted {len(removed)} rewards for user {user_id}")
except Exception as e:
logger.error(f"Failed to delete rewards for user {user_id}: {e}")
success = False
# Step 6: Remove user's images from database
try:
removed = image_db.remove(Query_.user_id == user_id)
if removed:
logger.info(f"Deleted {len(removed)} images from database for user {user_id}")
except Exception as e:
logger.error(f"Failed to delete images from database for user {user_id}: {e}")
success = False
# Step 7: Delete user's image directory from filesystem
try:
user_image_dir = get_user_image_dir(user_id)
if os.path.exists(user_image_dir):
shutil.rmtree(user_image_dir)
logger.info(f"Deleted image directory for user {user_id}")
else:
logger.info(f"Image directory for user {user_id} does not exist (already deleted or never created)")
except Exception as e:
logger.error(f"Failed to delete image directory for user {user_id}: {e}")
success = False
# Step 8: Remove user record
if success:
try:
users_db.remove(Query_.id == user_id)
deleted_at = datetime.now().isoformat()
logger.info(f"Successfully deleted user {user_id} ({user.email})")
# Send USER_DELETED event to admin users
send_user_deleted_event_to_admins(user_id, user.email, deleted_at)
return True
except Exception as e:
logger.error(f"Failed to delete user record for {user_id}: {e}")
return False
else:
# Deletion failed, update flags
logger.error(f"Deletion failed for user {user_id}, marking for retry")
users_db.update({
'deletion_in_progress': False,
'deletion_attempted_at': datetime.now().isoformat()
}, Query_.id == user_id)
return False
except Exception as e:
logger.error(f"Unexpected error during deletion for user {user_id}: {e}")
# Try to clear the in_progress flag
try:
users_db.update({
'deletion_in_progress': False,
'deletion_attempted_at': datetime.now().isoformat()
}, Query_.id == user_id)
except:
pass
return False
def process_deletion_queue(force=False):
"""
Process the deletion queue: find users due for deletion and delete them.
Args:
force (bool): If True, delete all marked users immediately without checking threshold.
If False, only delete users past the threshold time.
"""
if force:
logger.info("Starting FORCED deletion scheduler run (bypassing time threshold)")
else:
logger.info("Starting deletion scheduler run")
processed = 0
deleted = 0
failed = 0
try:
# Get all marked users
Query_ = Query()
marked_users = users_db.search(Query_.marked_for_deletion == True)
if not marked_users:
logger.info("No users marked for deletion")
return
logger.info(f"Found {len(marked_users)} users marked for deletion")
for user_dict in marked_users:
user = User.from_dict(user_dict)
processed += 1
# Check if user is due for deletion (skip check if force=True)
if not force and not is_user_due_for_deletion(user):
continue
# Check retry limit
attempt_count = get_deletion_attempt_count(user)
if attempt_count >= MAX_DELETION_ATTEMPTS:
logger.critical(
f"User {user.id} ({user.email}) has failed deletion {attempt_count} times. "
"Manual intervention required."
)
continue
# Skip if deletion is already in progress (from a previous run)
if user.deletion_in_progress:
logger.warning(
f"User {user.id} ({user.email}) has deletion_in_progress=True. "
"This may indicate a previous run was interrupted. Retrying..."
)
# Attempt deletion
if delete_user_data(user):
deleted += 1
else:
failed += 1
logger.info(
f"Deletion scheduler run complete: "
f"{processed} users processed, {deleted} deleted, {failed} failed"
)
except Exception as e:
logger.error(f"Error in deletion scheduler: {e}")
def check_interrupted_deletions():
"""
On startup, check for users with deletion_in_progress=True
and retry their deletion.
"""
logger.info("Checking for interrupted deletions from previous runs")
try:
Query_ = Query()
interrupted_users = users_db.search(
(Query_.marked_for_deletion == True) &
(Query_.deletion_in_progress == True)
)
if interrupted_users:
logger.warning(
f"Found {len(interrupted_users)} users with interrupted deletions. "
"Will retry on next scheduler run."
)
# Reset the flag so they can be retried
for user_dict in interrupted_users:
users_db.update(
{'deletion_in_progress': False},
Query_.id == user_dict['id']
)
except Exception as e:
logger.error(f"Error checking for interrupted deletions: {e}")
# Global scheduler instance
_scheduler = None
def start_deletion_scheduler():
"""
Start the background deletion scheduler.
Should be called once during application startup.
"""
global _scheduler
if _scheduler is not None:
logger.warning("Deletion scheduler is already running")
return
logger.info("Starting account deletion scheduler")
# Check for interrupted deletions from previous runs
check_interrupted_deletions()
# Create and start scheduler
_scheduler = BackgroundScheduler()
# Run every hour
_scheduler.add_job(
process_deletion_queue,
'interval',
hours=1,
id='account_deletion',
name='Account Deletion Scheduler',
replace_existing=True
)
_scheduler.start()
logger.info("Account deletion scheduler started (runs every 1 hour)")
def stop_deletion_scheduler():
"""
Stop the deletion scheduler (for testing or shutdown).
"""
global _scheduler
if _scheduler is not None:
_scheduler.shutdown()
_scheduler = None
logger.info("Account deletion scheduler stopped")
def trigger_deletion_manually():
"""
Manually trigger the deletion process (for admin use).
Deletes all marked users immediately without waiting for threshold.
Returns stats about the run.
"""
logger.info("Manual deletion trigger requested - forcing immediate deletion")
process_deletion_queue(force=True)
# Return stats (simplified version)
Query_ = Query()
marked_users = users_db.search(Query_.marked_for_deletion == True)
return {
'triggered': True,
'queued_users': len(marked_users)
}