feat: Implement task and reward tracking feature
All checks were successful
Gitea Actions Demo / build-and-push (push) Successful in 24s

- Added tracking events for tasks, penalties, and rewards with timestamps.
- Created new TinyDB table for tracking records to maintain audit history.
- Developed backend API for querying tracking events with filters and pagination.
- Implemented logging for tracking events with per-user rotating log files.
- Added unit tests for tracking event creation, querying, and anonymization.
- Deferred frontend changes for future implementation.
- Established acceptance criteria and documentation for the tracking feature.

feat: Introduce account deletion scheduler

- Implemented a scheduler to delete accounts marked for deletion after a configurable threshold.
- Added new fields to the User model to manage deletion status and attempts.
- Created admin API endpoints for managing deletion thresholds and viewing the deletion queue.
- Integrated error handling and logging for the deletion process.
- Developed unit tests for the deletion scheduler and related API endpoints.
- Documented the deletion process and acceptance criteria.
This commit is contained in:
2026-02-09 15:39:43 -05:00
parent 27f02224ab
commit 3dee8b80a2
20 changed files with 1450 additions and 0 deletions

View File

@@ -9,19 +9,23 @@ from api.pending_reward import PendingReward as PendingRewardResponse
from api.reward_status import RewardStatus
from api.utils import send_event_for_current_user
from db.db import child_db, task_db, reward_db, pending_reward_db
from db.tracking import insert_tracking_event
from events.types.child_modified import ChildModified
from events.types.child_reward_request import ChildRewardRequest
from events.types.child_reward_triggered import ChildRewardTriggered
from events.types.child_rewards_set import ChildRewardsSet
from events.types.child_task_triggered import ChildTaskTriggered
from events.types.child_tasks_set import ChildTasksSet
from events.types.tracking_event_created import TrackingEventCreated
from events.types.event import Event
from events.types.event_types import EventType
from models.child import Child
from models.pending_reward import PendingReward
from models.reward import Reward
from models.task import Task
from models.tracking_event import TrackingEvent
from api.utils import get_validated_user_id
from utils.tracking_logger import log_tracking_event
from collections import defaultdict
import logging
@@ -364,14 +368,38 @@ def trigger_child_task(id):
if not task_result:
return jsonify({'error': 'Task not found in task database'}), 404
task: Task = Task.from_dict(task_result[0])
# Capture points before modification
points_before = child.points
# update the child's points based on task type
if task.is_good:
child.points += task.points
else:
child.points -= task.points
child.points = max(child.points, 0)
# update the child in the database
child_db.update({'points': child.points}, ChildQuery.id == id)
# Create tracking event
entity_type = 'penalty' if not task.is_good else 'task'
tracking_event = TrackingEvent.create_event(
user_id=user_id,
child_id=child.id,
entity_type=entity_type,
entity_id=task.id,
action='activated',
points_before=points_before,
points_after=child.points,
metadata={'task_name': task.name, 'is_good': task.is_good}
)
insert_tracking_event(tracking_event)
log_tracking_event(tracking_event)
# Send tracking event via SSE
send_event_for_current_user(Event(EventType.TRACKING_EVENT_CREATED.value, TrackingEventCreated(tracking_event.id, child.id, entity_type, 'activated')))
resp = send_event_for_current_user(Event(EventType.CHILD_TASK_TRIGGERED.value, ChildTaskTriggered(task.id, child.id, child.points)))
if resp:
return resp
@@ -609,10 +637,31 @@ def trigger_child_reward(id):
if removed:
send_event_for_current_user(Event(EventType.CHILD_REWARD_REQUEST.value, ChildRewardRequest(reward.id, child.id, ChildRewardRequest.REQUEST_GRANTED)))
# Capture points before modification
points_before = child.points
# update the child's points based on reward cost
child.points -= reward.cost
# update the child in the database
child_db.update({'points': child.points}, ChildQuery.id == id)
# Create tracking event
tracking_event = TrackingEvent.create_event(
user_id=user_id,
child_id=child.id,
entity_type='reward',
entity_id=reward.id,
action='redeemed',
points_before=points_before,
points_after=child.points,
metadata={'reward_name': reward.name, 'reward_cost': reward.cost}
)
insert_tracking_event(tracking_event)
log_tracking_event(tracking_event)
# Send tracking event via SSE
send_event_for_current_user(Event(EventType.TRACKING_EVENT_CREATED.value, TrackingEventCreated(tracking_event.id, child.id, 'reward', 'redeemed')))
send_event_for_current_user(Event(EventType.CHILD_REWARD_TRIGGERED.value, ChildRewardTriggered(reward.id, child.id, child.points)))
return jsonify({'message': f'{reward.name} assigned to {child.name}.', 'points': child.points, 'id': child.id}), 200
@@ -707,6 +756,24 @@ def request_reward(id):
pending = PendingReward(child_id=child.id, reward_id=reward.id, user_id=user_id)
pending_reward_db.insert(pending.to_dict())
logger.info(f'Pending reward request created for child {child.name} for reward {reward.name}')
# Create tracking event (no points change on request)
tracking_event = TrackingEvent.create_event(
user_id=user_id,
child_id=child.id,
entity_type='reward',
entity_id=reward.id,
action='requested',
points_before=child.points,
points_after=child.points,
metadata={'reward_name': reward.name, 'reward_cost': reward.cost}
)
insert_tracking_event(tracking_event)
log_tracking_event(tracking_event)
# Send tracking event via SSE
send_event_for_current_user(Event(EventType.TRACKING_EVENT_CREATED.value, TrackingEventCreated(tracking_event.id, child.id, 'reward', 'requested')))
send_event_for_current_user(Event(EventType.CHILD_REWARD_REQUEST.value, ChildRewardRequest(child.id, reward.id, ChildRewardRequest.REQUEST_CREATED)))
return jsonify({
'message': f'Reward request for {reward.name} submitted for {child.name}.',
@@ -734,6 +801,12 @@ def cancel_request_reward(id):
child = Child.from_dict(result[0])
# Fetch reward details for tracking metadata
RewardQuery = Query()
reward_result = reward_db.get((RewardQuery.id == reward_id) & ((RewardQuery.user_id == user_id) | (RewardQuery.user_id == None)))
reward_name = reward_result.get('name') if reward_result else 'Unknown'
reward_cost = reward_result.get('cost', 0) if reward_result else 0
# Remove matching pending reward request
PendingQuery = Query()
removed = pending_reward_db.remove(
@@ -743,6 +816,23 @@ def cancel_request_reward(id):
if not removed:
return jsonify({'error': 'No pending request found for this reward'}), 404
# Create tracking event (no points change on cancel)
tracking_event = TrackingEvent.create_event(
user_id=user_id,
child_id=child.id,
entity_type='reward',
entity_id=reward_id,
action='cancelled',
points_before=child.points,
points_after=child.points,
metadata={'reward_name': reward_name, 'reward_cost': reward_cost}
)
insert_tracking_event(tracking_event)
log_tracking_event(tracking_event)
# Send tracking event via SSE
send_event_for_current_user(Event(EventType.TRACKING_EVENT_CREATED.value, TrackingEventCreated(tracking_event.id, child.id, 'reward', 'cancelled')))
# Notify user that the request was cancelled
resp = send_event_for_current_user(Event(EventType.CHILD_REWARD_REQUEST.value, ChildRewardRequest(child.id, reward_id, ChildRewardRequest.REQUEST_CANCELLED)))
if resp:

122
backend/api/tracking_api.py Normal file
View File

@@ -0,0 +1,122 @@
from flask import Blueprint, request, jsonify
from api.utils import get_validated_user_id
from db.tracking import get_tracking_events_by_child, get_tracking_events_by_user
from models.tracking_event import TrackingEvent
from functools import wraps
import jwt
from tinydb import Query
from db.db import users_db
from models.user import User
tracking_api = Blueprint('tracking_api', __name__)
def admin_required(f):
"""
Decorator to require admin role for endpoints.
"""
@wraps(f)
def decorated_function(*args, **kwargs):
# Get JWT token from cookie
token = request.cookies.get('token')
if not token:
return jsonify({'error': 'Authentication required', 'code': 'AUTH_REQUIRED'}), 401
try:
# Verify JWT token
payload = jwt.decode(token, 'supersecretkey', algorithms=['HS256'])
user_id = payload.get('user_id')
if not user_id:
return jsonify({'error': 'Invalid token', 'code': 'INVALID_TOKEN'}), 401
# Get user from database
Query_ = Query()
user_dict = users_db.get(Query_.id == user_id)
if not user_dict:
return jsonify({'error': 'User not found', 'code': 'USER_NOT_FOUND'}), 404
user = User.from_dict(user_dict)
# Check if user has admin role
if user.role != 'admin':
return jsonify({'error': 'Admin access required', 'code': 'ADMIN_REQUIRED'}), 403
# Store user_id in request context
request.admin_user_id = user_id
return f(*args, **kwargs)
except jwt.ExpiredSignatureError:
return jsonify({'error': 'Token expired', 'code': 'TOKEN_EXPIRED'}), 401
except jwt.InvalidTokenError:
return jsonify({'error': 'Invalid token', 'code': 'INVALID_TOKEN'}), 401
return decorated_function
@tracking_api.route('/admin/tracking', methods=['GET'])
@admin_required
def get_tracking():
"""
Admin endpoint to query tracking events with filters and pagination.
Query params:
- child_id: Filter by child ID (optional)
- user_id: Filter by user ID (optional, admin only)
- entity_type: Filter by entity type (task/reward/penalty) (optional)
- action: Filter by action type (activated/requested/redeemed/cancelled) (optional)
- limit: Max results (default 50, max 500)
- offset: Pagination offset (default 0)
"""
child_id = request.args.get('child_id')
filter_user_id = request.args.get('user_id')
entity_type = request.args.get('entity_type')
action = request.args.get('action')
limit = int(request.args.get('limit', 50))
offset = int(request.args.get('offset', 0))
# Validate limit
limit = min(max(limit, 1), 500)
offset = max(offset, 0)
# Validate filters
if entity_type and entity_type not in ['task', 'reward', 'penalty']:
return jsonify({'error': 'Invalid entity_type', 'code': 'INVALID_ENTITY_TYPE'}), 400
if action and action not in ['activated', 'requested', 'redeemed', 'cancelled']:
return jsonify({'error': 'Invalid action', 'code': 'INVALID_ACTION'}), 400
# Query tracking events
if child_id:
events, total = get_tracking_events_by_child(
child_id=child_id,
limit=limit,
offset=offset,
entity_type=entity_type,
action=action
)
elif filter_user_id:
events, total = get_tracking_events_by_user(
user_id=filter_user_id,
limit=limit,
offset=offset,
entity_type=entity_type
)
else:
return jsonify({
'error': 'Either child_id or user_id is required',
'code': 'MISSING_FILTER'
}), 400
# Convert to dict
events_data = [event.to_dict() for event in events]
return jsonify({
'tracking_events': events_data,
'total': total,
'limit': limit,
'offset': offset,
'count': len(events_data)
}), 200

View File

@@ -33,3 +33,9 @@ def get_user_image_dir(username: str | None) -> str:
if username:
return os.path.join(PROJECT_ROOT, get_base_data_dir(), 'images', username)
return os.path.join(PROJECT_ROOT, 'resources', 'images')
def get_logs_dir() -> str:
"""
Return the absolute directory path for application logs.
"""
return os.path.join(PROJECT_ROOT, 'logs')

View File

@@ -73,6 +73,7 @@ reward_path = os.path.join(base_dir, 'rewards.json')
image_path = os.path.join(base_dir, 'images.json')
pending_reward_path = os.path.join(base_dir, 'pending_rewards.json')
users_path = os.path.join(base_dir, 'users.json')
tracking_events_path = os.path.join(base_dir, 'tracking_events.json')
# Use separate TinyDB instances/files for each collection
_child_db = TinyDB(child_path, indent=2)
@@ -81,6 +82,7 @@ _reward_db = TinyDB(reward_path, indent=2)
_image_db = TinyDB(image_path, indent=2)
_pending_rewards_db = TinyDB(pending_reward_path, indent=2)
_users_db = TinyDB(users_path, indent=2)
_tracking_events_db = TinyDB(tracking_events_path, indent=2)
# Expose table objects wrapped with locking
child_db = LockedTable(_child_db)
@@ -89,6 +91,7 @@ reward_db = LockedTable(_reward_db)
image_db = LockedTable(_image_db)
pending_reward_db = LockedTable(_pending_rewards_db)
users_db = LockedTable(_users_db)
tracking_events_db = LockedTable(_tracking_events_db)
if os.environ.get('DB_ENV', 'prod') == 'test':
child_db.truncate()
@@ -97,4 +100,5 @@ if os.environ.get('DB_ENV', 'prod') == 'test':
image_db.truncate()
pending_reward_db.truncate()
users_db.truncate()
tracking_events_db.truncate()

125
backend/db/tracking.py Normal file
View File

@@ -0,0 +1,125 @@
"""Helper functions for tracking events database operations."""
import logging
from typing import Optional, List
from tinydb import Query
from db.db import tracking_events_db
from models.tracking_event import TrackingEvent, EntityType, ActionType
logger = logging.getLogger(__name__)
def insert_tracking_event(event: TrackingEvent) -> str:
"""
Insert a tracking event into the database.
Args:
event: TrackingEvent instance to insert
Returns:
The event ID
"""
try:
tracking_events_db.insert(event.to_dict())
logger.info(f"Tracking event created: {event.action} {event.entity_type} {event.entity_id} for child {event.child_id}")
return event.id
except Exception as e:
logger.error(f"Failed to insert tracking event: {e}")
raise
def get_tracking_events_by_child(
child_id: str,
limit: int = 50,
offset: int = 0,
entity_type: Optional[EntityType] = None,
action: Optional[ActionType] = None
) -> tuple[List[TrackingEvent], int]:
"""
Query tracking events for a specific child with optional filters.
Args:
child_id: Child ID to filter by
limit: Maximum number of results (default 50, max 500)
offset: Number of results to skip
entity_type: Optional filter by entity type
action: Optional filter by action type
Returns:
Tuple of (list of TrackingEvent instances, total count)
"""
limit = min(limit, 500)
TrackingQuery = Query()
query_condition = TrackingQuery.child_id == child_id
if entity_type:
query_condition &= TrackingQuery.entity_type == entity_type
if action:
query_condition &= TrackingQuery.action == action
all_results = tracking_events_db.search(query_condition)
total = len(all_results)
# Sort by occurred_at desc, then created_at desc
all_results.sort(key=lambda x: (x.get('occurred_at', ''), x.get('created_at', 0)), reverse=True)
paginated = all_results[offset:offset + limit]
events = [TrackingEvent.from_dict(r) for r in paginated]
return events, total
def get_tracking_events_by_user(
user_id: str,
limit: int = 50,
offset: int = 0,
entity_type: Optional[EntityType] = None
) -> tuple[List[TrackingEvent], int]:
"""
Query tracking events for a specific user.
Args:
user_id: User ID to filter by
limit: Maximum number of results
offset: Number of results to skip
entity_type: Optional filter by entity type
Returns:
Tuple of (list of TrackingEvent instances, total count)
"""
limit = min(limit, 500)
TrackingQuery = Query()
query_condition = TrackingQuery.user_id == user_id
if entity_type:
query_condition &= TrackingQuery.entity_type == entity_type
all_results = tracking_events_db.search(query_condition)
total = len(all_results)
all_results.sort(key=lambda x: (x.get('occurred_at', ''), x.get('created_at', 0)), reverse=True)
paginated = all_results[offset:offset + limit]
events = [TrackingEvent.from_dict(r) for r in paginated]
return events, total
def anonymize_tracking_events_for_user(user_id: str) -> int:
"""
Anonymize tracking events by setting user_id to None.
Called when a user is deleted.
Args:
user_id: User ID to anonymize
Returns:
Number of records anonymized
"""
TrackingQuery = Query()
result = tracking_events_db.update({'user_id': None}, TrackingQuery.user_id == user_id)
count = len(result) if result else 0
logger.info(f"Anonymized {count} tracking events for user {user_id}")
return count

View File

@@ -16,3 +16,5 @@ class EventType(Enum):
USER_MARKED_FOR_DELETION = "user_marked_for_deletion"
USER_DELETED = "user_deleted"
TRACKING_EVENT_CREATED = "tracking_event_created"

View File

@@ -0,0 +1,27 @@
from events.types.payload import Payload
class TrackingEventCreated(Payload):
def __init__(self, tracking_event_id: str, child_id: str, entity_type: str, action: str):
super().__init__({
'tracking_event_id': tracking_event_id,
'child_id': child_id,
'entity_type': entity_type,
'action': action
})
@property
def tracking_event_id(self) -> str:
return self.get("tracking_event_id")
@property
def child_id(self) -> str:
return self.get("child_id")
@property
def entity_type(self) -> str:
return self.get("entity_type")
@property
def action(self) -> str:
return self.get("action")

View File

@@ -10,6 +10,7 @@ from api.child_api import child_api
from api.image_api import image_api
from api.reward_api import reward_api
from api.task_api import task_api
from api.tracking_api import tracking_api
from api.user_api import user_api
from config.version import get_full_version
@@ -37,6 +38,7 @@ app.register_blueprint(task_api)
app.register_blueprint(image_api)
app.register_blueprint(auth_api)
app.register_blueprint(user_api)
app.register_blueprint(tracking_api)
app.config.update(
MAIL_SERVER='smtp.gmail.com',

View File

@@ -0,0 +1,91 @@
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Literal, Optional
from models.base import BaseModel
EntityType = Literal['task', 'reward', 'penalty']
ActionType = Literal['activated', 'requested', 'redeemed', 'cancelled']
@dataclass
class TrackingEvent(BaseModel):
user_id: Optional[str]
child_id: str
entity_type: EntityType
entity_id: str
action: ActionType
points_before: int
points_after: int
delta: int
occurred_at: str # UTC ISO 8601 timestamp
metadata: Optional[dict] = None
def __post_init__(self):
"""Validate invariants after initialization."""
if self.delta != self.points_after - self.points_before:
raise ValueError(
f"Delta invariant violated: {self.delta} != {self.points_after} - {self.points_before}"
)
@classmethod
def from_dict(cls, d: dict):
return cls(
user_id=d.get('user_id'),
child_id=d.get('child_id'),
entity_type=d.get('entity_type'),
entity_id=d.get('entity_id'),
action=d.get('action'),
points_before=d.get('points_before'),
points_after=d.get('points_after'),
delta=d.get('delta'),
occurred_at=d.get('occurred_at'),
metadata=d.get('metadata'),
id=d.get('id'),
created_at=d.get('created_at'),
updated_at=d.get('updated_at')
)
def to_dict(self):
base = super().to_dict()
base.update({
'user_id': self.user_id,
'child_id': self.child_id,
'entity_type': self.entity_type,
'entity_id': self.entity_id,
'action': self.action,
'points_before': self.points_before,
'points_after': self.points_after,
'delta': self.delta,
'occurred_at': self.occurred_at,
'metadata': self.metadata
})
return base
@staticmethod
def create_event(
user_id: Optional[str],
child_id: str,
entity_type: EntityType,
entity_id: str,
action: ActionType,
points_before: int,
points_after: int,
metadata: Optional[dict] = None
) -> 'TrackingEvent':
"""Factory method to create a tracking event with server timestamp."""
delta = points_after - points_before
occurred_at = datetime.now(timezone.utc).isoformat()
return TrackingEvent(
user_id=user_id,
child_id=child_id,
entity_type=entity_type,
entity_id=entity_id,
action=action,
points_before=points_before,
points_after=points_after,
delta=delta,
occurred_at=occurred_at,
metadata=metadata
)

View File

@@ -0,0 +1,254 @@
import os
os.environ['DB_ENV'] = 'test'
import pytest
from models.tracking_event import TrackingEvent
from db.tracking import (
insert_tracking_event,
get_tracking_events_by_child,
get_tracking_events_by_user,
anonymize_tracking_events_for_user
)
from db.db import tracking_events_db
def test_tracking_event_creation():
"""Test creating a tracking event with factory method."""
event = TrackingEvent.create_event(
user_id='user123',
child_id='child456',
entity_type='task',
entity_id='task789',
action='activated',
points_before=10,
points_after=20,
metadata={'task_name': 'Homework'}
)
assert event.user_id == 'user123'
assert event.child_id == 'child456'
assert event.entity_type == 'task'
assert event.action == 'activated'
assert event.points_before == 10
assert event.points_after == 20
assert event.delta == 10
assert event.metadata == {'task_name': 'Homework'}
assert event.occurred_at # Should have ISO timestamp
def test_tracking_event_delta_invariant():
"""Test that delta invariant is enforced."""
with pytest.raises(ValueError, match="Delta invariant violated"):
TrackingEvent(
user_id='user123',
child_id='child456',
entity_type='task',
entity_id='task789',
action='activated',
points_before=10,
points_after=20,
delta=5, # Wrong! Should be 10
occurred_at='2026-02-09T12:00:00Z'
)
def test_insert_and_query_tracking_event():
"""Test inserting and querying tracking events."""
tracking_events_db.truncate()
event1 = TrackingEvent.create_event(
user_id='user1',
child_id='child1',
entity_type='task',
entity_id='task1',
action='activated',
points_before=0,
points_after=10
)
event2 = TrackingEvent.create_event(
user_id='user1',
child_id='child1',
entity_type='reward',
entity_id='reward1',
action='requested',
points_before=10,
points_after=10
)
insert_tracking_event(event1)
insert_tracking_event(event2)
# Query by child
events, total = get_tracking_events_by_child('child1', limit=10, offset=0)
assert total == 2
assert len(events) == 2
def test_query_with_filters():
"""Test querying with entity_type and action filters."""
tracking_events_db.truncate()
# Insert task activation
task_event = TrackingEvent.create_event(
user_id='user1',
child_id='child1',
entity_type='task',
entity_id='task1',
action='activated',
points_before=0,
points_after=10
)
insert_tracking_event(task_event)
# Insert reward request
reward_event = TrackingEvent.create_event(
user_id='user1',
child_id='child1',
entity_type='reward',
entity_id='reward1',
action='requested',
points_before=10,
points_after=10
)
insert_tracking_event(reward_event)
# Filter by entity_type
events, total = get_tracking_events_by_child('child1', entity_type='task')
assert total == 1
assert events[0].entity_type == 'task'
# Filter by action
events, total = get_tracking_events_by_child('child1', action='requested')
assert total == 1
assert events[0].action == 'requested'
def test_pagination():
"""Test offset-based pagination."""
tracking_events_db.truncate()
# Insert 5 events
for i in range(5):
event = TrackingEvent.create_event(
user_id='user1',
child_id='child1',
entity_type='task',
entity_id=f'task{i}',
action='activated',
points_before=i * 10,
points_after=(i + 1) * 10
)
insert_tracking_event(event)
# First page
events, total = get_tracking_events_by_child('child1', limit=2, offset=0)
assert total == 5
assert len(events) == 2
# Second page
events, total = get_tracking_events_by_child('child1', limit=2, offset=2)
assert total == 5
assert len(events) == 2
# Last page
events, total = get_tracking_events_by_child('child1', limit=2, offset=4)
assert total == 5
assert len(events) == 1
def test_anonymize_tracking_events():
"""Test anonymizing tracking events on user deletion."""
tracking_events_db.truncate()
event = TrackingEvent.create_event(
user_id='user_to_delete',
child_id='child1',
entity_type='task',
entity_id='task1',
action='activated',
points_before=0,
points_after=10
)
insert_tracking_event(event)
# Anonymize
count = anonymize_tracking_events_for_user('user_to_delete')
assert count == 1
# Verify user_id is None
events, total = get_tracking_events_by_child('child1')
assert total == 1
assert events[0].user_id is None
assert events[0].child_id == 'child1' # Child data preserved
def test_points_change_correctness():
"""Test that points before/after/delta are tracked correctly."""
tracking_events_db.truncate()
# Task activation (points increase)
task_event = TrackingEvent.create_event(
user_id='user1',
child_id='child1',
entity_type='task',
entity_id='task1',
action='activated',
points_before=50,
points_after=60
)
assert task_event.delta == 10
insert_tracking_event(task_event)
# Reward redeem (points decrease)
reward_event = TrackingEvent.create_event(
user_id='user1',
child_id='child1',
entity_type='reward',
entity_id='reward1',
action='redeemed',
points_before=60,
points_after=40
)
assert reward_event.delta == -20
insert_tracking_event(reward_event)
# Query and verify
events, _ = get_tracking_events_by_child('child1')
assert len(events) == 2
assert events[0].delta == -20 # Most recent (sorted desc)
assert events[1].delta == 10
def test_no_points_change_for_request_and_cancel():
"""Test that reward request and cancel have delta=0."""
tracking_events_db.truncate()
# Request
request_event = TrackingEvent.create_event(
user_id='user1',
child_id='child1',
entity_type='reward',
entity_id='reward1',
action='requested',
points_before=100,
points_after=100
)
assert request_event.delta == 0
insert_tracking_event(request_event)
# Cancel
cancel_event = TrackingEvent.create_event(
user_id='user1',
child_id='child1',
entity_type='reward',
entity_id='reward1',
action='cancelled',
points_before=100,
points_after=100
)
assert cancel_event.delta == 0
insert_tracking_event(cancel_event)
events, _ = get_tracking_events_by_child('child1')
assert all(e.delta == 0 for e in events)

View File

@@ -0,0 +1,84 @@
"""Per-user rotating audit logger for tracking events."""
import logging
import os
from logging.handlers import RotatingFileHandler
from config.paths import get_logs_dir
from models.tracking_event import TrackingEvent
# Store handlers per user_id to avoid recreating
_user_loggers = {}
def get_tracking_logger(user_id: str) -> logging.Logger:
"""
Get or create a per-user rotating file logger for tracking events.
Args:
user_id: User ID for the log file
Returns:
Logger instance configured for the user
"""
if user_id in _user_loggers:
return _user_loggers[user_id]
logs_dir = get_logs_dir()
os.makedirs(logs_dir, exist_ok=True)
log_file = os.path.join(logs_dir, f'tracking_user_{user_id}.log')
logger = logging.getLogger(f'tracking.user.{user_id}')
logger.setLevel(logging.INFO)
logger.propagate = False # Don't propagate to root logger
# Rotating file handler: 10MB max, keep 5 backups
handler = RotatingFileHandler(
log_file,
maxBytes=10 * 1024 * 1024, # 10MB
backupCount=5,
encoding='utf-8'
)
formatter = logging.Formatter(
'%(asctime)s | %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
_user_loggers[user_id] = logger
return logger
def log_tracking_event(event: TrackingEvent) -> None:
"""
Log a tracking event to the user's audit log file.
Args:
event: TrackingEvent to log
"""
if not event.user_id:
# If user was deleted (anonymized), skip logging
return
logger = get_tracking_logger(event.user_id)
log_msg = (
f"user_id={event.user_id} | "
f"child_id={event.child_id} | "
f"entity_type={event.entity_type} | "
f"entity_id={event.entity_id} | "
f"action={event.action} | "
f"points_before={event.points_before} | "
f"points_after={event.points_after} | "
f"delta={event.delta:+d} | "
f"occurred_at={event.occurred_at}"
)
if event.metadata:
metadata_str = ' | '.join(f"{k}={v}" for k, v in event.metadata.items())
log_msg += f" | {metadata_str}"
logger.info(log_msg)