All checks were successful
Gitea Actions Demo / build-and-push (push) Successful in 24s
- Added tracking events for tasks, penalties, and rewards with timestamps. - Created new TinyDB table for tracking records to maintain audit history. - Developed backend API for querying tracking events with filters and pagination. - Implemented logging for tracking events with per-user rotating log files. - Added unit tests for tracking event creation, querying, and anonymization. - Deferred frontend changes for future implementation. - Established acceptance criteria and documentation for the tracking feature. feat: Introduce account deletion scheduler - Implemented a scheduler to delete accounts marked for deletion after a configurable threshold. - Added new fields to the User model to manage deletion status and attempts. - Created admin API endpoints for managing deletion thresholds and viewing the deletion queue. - Integrated error handling and logging for the deletion process. - Developed unit tests for the deletion scheduler and related API endpoints. - Documented the deletion process and acceptance criteria.
126 lines
3.7 KiB
Python
126 lines
3.7 KiB
Python
"""Helper functions for tracking events database operations."""
|
|
import logging
|
|
from typing import Optional, List
|
|
from tinydb import Query
|
|
from db.db import tracking_events_db
|
|
from models.tracking_event import TrackingEvent, EntityType, ActionType
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def insert_tracking_event(event: TrackingEvent) -> str:
|
|
"""
|
|
Insert a tracking event into the database.
|
|
|
|
Args:
|
|
event: TrackingEvent instance to insert
|
|
|
|
Returns:
|
|
The event ID
|
|
"""
|
|
try:
|
|
tracking_events_db.insert(event.to_dict())
|
|
logger.info(f"Tracking event created: {event.action} {event.entity_type} {event.entity_id} for child {event.child_id}")
|
|
return event.id
|
|
except Exception as e:
|
|
logger.error(f"Failed to insert tracking event: {e}")
|
|
raise
|
|
|
|
|
|
def get_tracking_events_by_child(
|
|
child_id: str,
|
|
limit: int = 50,
|
|
offset: int = 0,
|
|
entity_type: Optional[EntityType] = None,
|
|
action: Optional[ActionType] = None
|
|
) -> tuple[List[TrackingEvent], int]:
|
|
"""
|
|
Query tracking events for a specific child with optional filters.
|
|
|
|
Args:
|
|
child_id: Child ID to filter by
|
|
limit: Maximum number of results (default 50, max 500)
|
|
offset: Number of results to skip
|
|
entity_type: Optional filter by entity type
|
|
action: Optional filter by action type
|
|
|
|
Returns:
|
|
Tuple of (list of TrackingEvent instances, total count)
|
|
"""
|
|
limit = min(limit, 500)
|
|
|
|
TrackingQuery = Query()
|
|
query_condition = TrackingQuery.child_id == child_id
|
|
|
|
if entity_type:
|
|
query_condition &= TrackingQuery.entity_type == entity_type
|
|
if action:
|
|
query_condition &= TrackingQuery.action == action
|
|
|
|
all_results = tracking_events_db.search(query_condition)
|
|
total = len(all_results)
|
|
|
|
# Sort by occurred_at desc, then created_at desc
|
|
all_results.sort(key=lambda x: (x.get('occurred_at', ''), x.get('created_at', 0)), reverse=True)
|
|
|
|
paginated = all_results[offset:offset + limit]
|
|
events = [TrackingEvent.from_dict(r) for r in paginated]
|
|
|
|
return events, total
|
|
|
|
|
|
def get_tracking_events_by_user(
|
|
user_id: str,
|
|
limit: int = 50,
|
|
offset: int = 0,
|
|
entity_type: Optional[EntityType] = None
|
|
) -> tuple[List[TrackingEvent], int]:
|
|
"""
|
|
Query tracking events for a specific user.
|
|
|
|
Args:
|
|
user_id: User ID to filter by
|
|
limit: Maximum number of results
|
|
offset: Number of results to skip
|
|
entity_type: Optional filter by entity type
|
|
|
|
Returns:
|
|
Tuple of (list of TrackingEvent instances, total count)
|
|
"""
|
|
limit = min(limit, 500)
|
|
|
|
TrackingQuery = Query()
|
|
query_condition = TrackingQuery.user_id == user_id
|
|
|
|
if entity_type:
|
|
query_condition &= TrackingQuery.entity_type == entity_type
|
|
|
|
all_results = tracking_events_db.search(query_condition)
|
|
total = len(all_results)
|
|
|
|
all_results.sort(key=lambda x: (x.get('occurred_at', ''), x.get('created_at', 0)), reverse=True)
|
|
|
|
paginated = all_results[offset:offset + limit]
|
|
events = [TrackingEvent.from_dict(r) for r in paginated]
|
|
|
|
return events, total
|
|
|
|
|
|
def anonymize_tracking_events_for_user(user_id: str) -> int:
|
|
"""
|
|
Anonymize tracking events by setting user_id to None.
|
|
Called when a user is deleted.
|
|
|
|
Args:
|
|
user_id: User ID to anonymize
|
|
|
|
Returns:
|
|
Number of records anonymized
|
|
"""
|
|
TrackingQuery = Query()
|
|
result = tracking_events_db.update({'user_id': None}, TrackingQuery.user_id == user_id)
|
|
count = len(result) if result else 0
|
|
logger.info(f"Anonymized {count} tracking events for user {user_id}")
|
|
return count
|