Files
chore/api/child_api.py

672 lines
24 KiB
Python

from flask import Blueprint, request, jsonify
from tinydb import Query
from db.db import child_db, task_db, reward_db, pending_reward_db
from api.reward_status import RewardStatus
from api.child_tasks import ChildTask
from api.child_rewards import ChildReward
from events.sse import send_event_to_user
from events.types.child_modified import ChildModified
from events.types.child_reward_request import ChildRewardRequest
from events.types.child_reward_triggered import ChildRewardTriggered
from events.types.child_rewards_set import ChildRewardsSet
from events.types.child_task_triggered import ChildTaskTriggered
from events.types.child_tasks_set import ChildTasksSet
from events.types.event import Event
from events.types.event_types import EventType
from api.pending_reward import PendingReward as PendingRewardResponse
from models.child import Child
from models.pending_reward import PendingReward
from models.task import Task
from models.reward import Reward
child_api = Blueprint('child_api', __name__)
@child_api.route('/child/<name>', methods=['GET'])
@child_api.route('/child/<id>', methods=['GET'])
def get_child(id):
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
return jsonify(Child.from_dict(result[0]).to_dict()), 200
@child_api.route('/child/add', methods=['PUT'])
def add_child():
data = request.get_json()
name = data.get('name')
age = data.get('age')
image = data.get('image_id', None)
if not name:
return jsonify({'error': 'Name is required'}), 400
if not image:
image = 'boy01'
child = Child(name=name, age=age, image_id=image)
child_db.insert(child.to_dict())
send_event_to_user("user123", Event(EventType.CHILD_MODIFIED.value, ChildModified(child.id, ChildModified.OPERATION_ADD)))
return jsonify({'message': f'Child {name} added.'}), 201
@child_api.route('/child/<id>/edit', methods=['PUT'])
def edit_child(id):
data = request.get_json()
name = data.get('name', None)
age = data.get('age', None)
points = data.get('points', None)
image = data.get('image_id', None)
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
child = Child.from_dict(result[0])
if name is not None:
child.name = name
if age is not None:
child.age = age
if points is not None:
child.points = points
if image is not None:
child.image_id = image
# Check if points changed and handle pending rewards
if points is not None:
PendingQuery = Query()
pending_rewards = pending_reward_db.search(PendingQuery.child_id == id)
RewardQuery = Query()
for pr in pending_rewards:
pending = PendingReward.from_dict(pr)
reward_result = reward_db.get(RewardQuery.id == pending.reward_id)
if reward_result:
reward = Reward.from_dict(reward_result)
# If child can no longer afford the reward, remove the pending request
if child.points < reward.cost:
pending_reward_db.remove(
(PendingQuery.child_id == id) & (PendingQuery.reward_id == reward.id)
)
send_event_to_user(
"user123",
Event(
EventType.CHILD_REWARD_REQUEST.value,
ChildRewardRequest(id, reward.id, ChildRewardRequest.REQUEST_CANCELLED)
)
)
child_db.update(child.to_dict(), ChildQuery.id == id)
send_event_to_user("user123", Event(EventType.CHILD_MODIFIED.value, ChildModified(id, ChildModified.OPERATION_EDIT)))
return jsonify({'message': f'Child {id} updated.'}), 200
@child_api.route('/child/list', methods=['GET'])
def list_children():
children = child_db.all()
return jsonify({'children': children}), 200
# Child DELETE
@child_api.route('/child/<id>', methods=['DELETE'])
def delete_child(id):
ChildQuery = Query()
if child_db.remove(ChildQuery.id == id):
send_event_to_user("user123",
Event(EventType.CHILD_MODIFIED.value, ChildModified(id, ChildModified.OPERATION_DELETE)))
return jsonify({'message': f'Child {id} deleted.'}), 200
return jsonify({'error': 'Child not found'}), 404
@child_api.route('/child/<id>/assign-task', methods=['POST'])
def assign_task_to_child(id):
data = request.get_json()
task_id = data.get('task_id')
if not task_id:
return jsonify({'error': 'task_id is required'}), 400
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
child = result[0]
if task_id not in child.get('tasks', []):
child['tasks'].append(task_id)
child_db.update({'tasks': child['tasks']}, ChildQuery.id == id)
return jsonify({'message': f'Task {task_id} assigned to {child['name']}.'}), 200
# python
@child_api.route('/child/<id>/set-tasks', methods=['PUT'])
def set_child_tasks(id):
data = request.get_json() or {}
task_ids = data.get('task_ids')
if not isinstance(task_ids, list):
return jsonify({'error': 'task_ids must be a list'}), 400
# Deduplicate and drop falsy values
new_task_ids = [tid for tid in dict.fromkeys(task_ids) if tid]
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
# Optional: validate task IDs exist in the task DB
TaskQuery = Query()
valid_task_ids = []
for tid in new_task_ids:
if task_db.get(TaskQuery.id == tid):
valid_task_ids.append(tid)
# Replace tasks with validated IDs
child_db.update({'tasks': valid_task_ids}, ChildQuery.id == id)
send_event_to_user("user123", Event(EventType.CHILD_TASKS_SET.value, ChildTasksSet(id, valid_task_ids)))
return jsonify({
'message': f'Tasks set for child {id}.',
'task_ids': valid_task_ids,
'count': len(valid_task_ids)
}), 200
@child_api.route('/child/<id>/remove-task', methods=['POST'])
def remove_task_from_child(id):
data = request.get_json()
task_id = data.get('task_id')
if not task_id:
return jsonify({'error': 'task_id is required'}), 400
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
child = result[0]
if task_id in child.get('tasks', []):
child['tasks'].remove(task_id)
child_db.update({'tasks': child['tasks']}, ChildQuery.id == id)
return jsonify({'message': f'Task {task_id} removed from {child["name"]}.'}), 200
return jsonify({'error': 'Task not assigned to child'}), 400
@child_api.route('/child/<id>/list-tasks', methods=['GET'])
def list_child_tasks(id):
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
child = result[0]
task_ids = child.get('tasks', [])
TaskQuery = Query()
child_tasks = []
for tid in task_ids:
task = task_db.get(TaskQuery.id == tid)
if not task:
continue
ct = ChildTask(task.get('name'), task.get('is_good'), task.get('points'), task.get('image_id'), task.get('id'))
child_tasks.append(ct.to_dict())
return jsonify({'tasks': child_tasks}), 200
@child_api.route('/child/<id>/list-assignable-tasks', methods=['GET'])
def list_assignable_tasks(id):
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
child = result[0]
assigned_ids = set(child.get('tasks', []))
# Collect all task ids from the task database
all_task_ids = [t.get('id') for t in task_db.all() if t and t.get('id')]
# Filter out already assigned
assignable_ids = [tid for tid in all_task_ids if tid not in assigned_ids]
# Fetch full task details and wrap in ChildTask
TaskQuery = Query()
assignable_tasks = []
for tid in assignable_ids:
task = task_db.get(TaskQuery.id == tid)
if not task:
continue
ct = ChildTask(task.get('name'), task.get('is_good'), task.get('points'), task.get('image_id'), task.get('id'))
assignable_tasks.append(ct.to_dict())
return jsonify({'tasks': assignable_tasks, 'count': len(assignable_tasks)}), 200
@child_api.route('/child/<id>/list-all-tasks', methods=['GET'])
def list_all_tasks(id):
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
child = result[0]
assigned_ids = set(child.get('tasks', []))
# Get all tasks from database
all_tasks = task_db.all()
assigned_tasks = []
assignable_tasks = []
for task in all_tasks:
if not task or not task.get('id'):
continue
ct = ChildTask(
task.get('name'),
task.get('is_good'),
task.get('points'),
task.get('image_id'),
task.get('id')
)
if task.get('id') in assigned_ids:
assigned_tasks.append(ct.to_dict())
else:
assignable_tasks.append(ct.to_dict())
return jsonify({
'assigned_tasks': assigned_tasks,
'assignable_tasks': assignable_tasks,
'assigned_count': len(assigned_tasks),
'assignable_count': len(assignable_tasks)
}), 200
@child_api.route('/child/<id>/trigger-task', methods=['POST'])
def trigger_child_task(id):
data = request.get_json()
task_id = data.get('task_id')
if not task_id:
return jsonify({'error': 'task_id is required'}), 400
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
child: Child = Child.from_dict(result[0])
if task_id not in child.tasks:
return jsonify({'error': f'Task not found assigned to child {child.name}'}), 404
# look up the task and get the details
TaskQuery = Query()
task_result = task_db.search(TaskQuery.id == task_id)
if not task_result:
return jsonify({'error': 'Task not found in task database'}), 404
task: Task = Task.from_dict(task_result[0])
# update the child's points based on task type
if task.is_good:
child.points += task.points
else:
child.points -= task.points
child.points = max(child.points, 0)
# update the child in the database
child_db.update({'points': child.points}, ChildQuery.id == id)
send_event_to_user("user123", Event(EventType.CHILD_TASK_TRIGGERED.value, ChildTaskTriggered(task.id, child.id, child.points)))
return jsonify({'message': f'{task.name} points assigned to {child.name}.', 'points': child.points, 'id': child.id}), 200
@child_api.route('/child/<id>/assign-reward', methods=['POST'])
def assign_reward_to_child(id):
data = request.get_json()
reward_id = data.get('reward_id')
if not reward_id:
return jsonify({'error': 'reward_id is required'}), 400
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
child = result[0]
if reward_id not in child.get('rewards', []):
child['rewards'].append(reward_id)
child_db.update({'rewards': child['rewards']}, ChildQuery.id == id)
return jsonify({'message': f'Reward {reward_id} assigned to {child["name"]}.'}), 200
@child_api.route('/child/<id>/list-all-rewards', methods=['GET'])
def list_all_rewards(id):
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
child = result[0]
assigned_ids = set(child.get('rewards', []))
# Get all rewards from database
all_rewards = reward_db.all()
assigned_rewards = []
assignable_rewards = []
for reward in all_rewards:
if not reward or not reward.get('id'):
continue
cr = ChildReward(
reward.get('name'),
reward.get('cost'),
reward.get('image_id'),
reward.get('id')
)
if reward.get('id') in assigned_ids:
assigned_rewards.append(cr.to_dict())
else:
assignable_rewards.append(cr.to_dict())
return jsonify({
'assigned_rewards': assigned_rewards,
'assignable_rewards': assignable_rewards,
'assigned_count': len(assigned_rewards),
'assignable_count': len(assignable_rewards)
}), 200
@child_api.route('/child/<id>/set-rewards', methods=['PUT'])
def set_child_rewards(id):
data = request.get_json() or {}
reward_ids = data.get('reward_ids')
if not isinstance(reward_ids, list):
return jsonify({'error': 'reward_ids must be a list'}), 400
# Deduplicate and drop falsy values
new_reward_ids = [rid for rid in dict.fromkeys(reward_ids) if rid]
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
# Optional: validate reward IDs exist in the reward DB
RewardQuery = Query()
valid_reward_ids = []
for rid in new_reward_ids:
if reward_db.get(RewardQuery.id == rid):
valid_reward_ids.append(rid)
# Replace rewards with validated IDs
child_db.update({'rewards': valid_reward_ids}, ChildQuery.id == id)
send_event_to_user("user123", Event(EventType.CHILD_REWARDS_SET.value, ChildRewardsSet(id, valid_reward_ids)))
return jsonify({
'message': f'Rewards set for child {id}.',
'reward_ids': valid_reward_ids,
'count': len(valid_reward_ids)
}), 200
@child_api.route('/child/<id>/remove-reward', methods=['POST'])
def remove_reward_from_child(id):
data = request.get_json()
reward_id = data.get('reward_id')
if not reward_id:
return jsonify({'error': 'reward_id is required'}), 400
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
child = result[0]
if reward_id in child.get('rewards', []):
child['rewards'].remove(reward_id)
child_db.update({'rewards': child['rewards']}, ChildQuery.id == id)
return jsonify({'message': f'Reward {reward_id} removed from {child["name"]}.'}), 200
return jsonify({'error': 'Reward not assigned to child'}), 400
@child_api.route('/child/<id>/list-rewards', methods=['GET'])
def list_child_rewards(id):
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
child = result[0]
reward_ids = child.get('rewards', [])
RewardQuery = Query()
child_rewards = []
for rid in reward_ids:
reward = reward_db.get(RewardQuery.id == rid)
if not reward:
continue
cr = ChildReward(reward.get('name'), reward.get('cost'), reward.get('image_id'), reward.get('id'))
child_rewards.append(cr.to_dict())
return jsonify({'rewards': child_rewards}), 200
@child_api.route('/child/<id>/list-assignable-rewards', methods=['GET'])
def list_assignable_rewards(id):
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
child = result[0]
assigned_ids = set(child.get('rewards', []))
all_reward_ids = [r.get('id') for r in reward_db.all() if r and r.get('id')]
assignable_ids = [rid for rid in all_reward_ids if rid not in assigned_ids]
RewardQuery = Query()
assignable_rewards = []
for rid in assignable_ids:
reward = reward_db.get(RewardQuery.id == rid)
if not reward:
continue
cr = ChildReward(reward.get('name'), reward.get('cost'), reward.get('image_id'), reward.get('id'))
assignable_rewards.append(cr.to_dict())
return jsonify({'rewards': assignable_rewards, 'count': len(assignable_rewards)}), 200
@child_api.route('/child/<id>/trigger-reward', methods=['POST'])
def trigger_child_reward(id):
data = request.get_json()
reward_id = data.get('reward_id')
if not reward_id:
return jsonify({'error': 'reward_id is required'}), 400
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
child: Child = Child.from_dict(result[0])
if reward_id not in child.rewards:
return jsonify({'error': f'Reward not found assigned to child {child.name}'}), 404
# look up the task and get the details
RewardQuery = Query()
reward_result = reward_db.search(RewardQuery.id == reward_id)
if not reward_result:
return jsonify({'error': 'Reward not found in reward database'}), 404
reward: Reward = Reward.from_dict(reward_result[0])
# Remove matching pending reward requests for this child and reward
PendingQuery = Query()
removed = pending_reward_db.remove(
(PendingQuery.child_id == child.id) & (PendingQuery.reward_id == reward.id)
)
if removed:
send_event_to_user("user123", Event(EventType.CHILD_REWARD_REQUEST.value, ChildRewardRequest(reward.id, child.id, ChildRewardRequest.REQUEST_GRANTED)))
# update the child's points based on reward cost
child.points -= reward.cost
# update the child in the database
child_db.update({'points': child.points}, ChildQuery.id == id)
send_event_to_user("user123", Event(EventType.CHILD_REWARD_TRIGGERED.value, ChildRewardTriggered(reward.id, child.id, child.points)))
return jsonify({'message': f'{reward.name} assigned to {child.name}.', 'points': child.points, 'id': child.id}), 200
@child_api.route('/child/<id>/affordable-rewards', methods=['GET'])
def list_affordable_rewards(id):
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
child = Child.from_dict(result[0])
points = child.points
reward_ids = child.rewards
RewardQuery = Query()
affordable = [
Reward.from_dict(reward).to_dict() for reward_id in reward_ids
if (reward := reward_db.get(RewardQuery.id == reward_id)) and points >= Reward.from_dict(reward).cost
]
return jsonify({'affordable_rewards': affordable}), 200
@child_api.route('/child/<id>/reward-status', methods=['GET'])
def reward_status(id):
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
child = Child.from_dict(result[0])
points = child.points
reward_ids = child.rewards
RewardQuery = Query()
statuses = []
for reward_id in reward_ids:
reward: Reward = Reward.from_dict(reward_db.get(RewardQuery.id == reward_id))
if not reward:
continue
points_needed = max(0, reward.cost - points)
#check to see if this reward id and child id is in the pending rewards db if so set its redeeming flag to true
pending_query = Query()
pending = pending_reward_db.get((pending_query.child_id == child.id) & (pending_query.reward_id == reward.id))
status = RewardStatus(reward.id, reward.name, points_needed, reward.cost, pending is not None, reward.image_id)
statuses.append(status.to_dict())
statuses.sort(key=lambda s: (not s['redeeming'], s['cost']))
return jsonify({'reward_status': statuses}), 200
@child_api.route('/child/<id>/request-reward', methods=['POST'])
def request_reward(id):
data = request.get_json()
reward_id = data.get('reward_id')
if not reward_id:
return jsonify({'error': 'reward_id is required'}), 400
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
child = Child.from_dict(result[0])
if reward_id not in child.rewards:
return jsonify({'error': f'Reward not assigned to child {child.name}'}), 404
RewardQuery = Query()
reward_result = reward_db.search(RewardQuery.id == reward_id)
if not reward_result:
return jsonify({'error': 'Reward not found in reward database'}), 404
reward = Reward.from_dict(reward_result[0])
# Check if child has enough points
if child.points < reward.cost:
points_needed = reward.cost - child.points
return jsonify({
'error': 'Insufficient points',
'points_needed': points_needed,
'current_points': child.points,
'reward_cost': reward.cost
}), 400
pending = PendingReward(child_id=child.id, reward_id=reward.id)
pending_reward_db.insert(pending.to_dict())
send_event_to_user("user123", Event(EventType.CHILD_REWARD_REQUEST.value, ChildRewardRequest(child.id, reward.id, ChildRewardRequest.REQUEST_CREATED)))
return jsonify({
'message': f'Reward request for {reward.name} submitted for {child.name}.',
'reward_id': reward.id,
'reward_name': reward.name,
'child_id': child.id,
'child_name': child.name,
'cost': reward.cost
}), 200
@child_api.route('/child/<id>/cancel-request-reward', methods=['POST'])
def cancel_request_reward(id):
data = request.get_json()
reward_id = data.get('reward_id')
if not reward_id:
return jsonify({'error': 'reward_id is required'}), 400
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
child = Child.from_dict(result[0])
# Remove matching pending reward request
PendingQuery = Query()
removed = pending_reward_db.remove(
(PendingQuery.child_id == child.id) & (PendingQuery.reward_id == reward_id)
)
if not removed:
return jsonify({'error': 'No pending request found for this reward'}), 404
# Notify user that the request was cancelled
send_event_to_user(
"user123",
Event(
EventType.CHILD_REWARD_REQUEST.value,
ChildRewardRequest(child.id, reward_id, ChildRewardRequest.REQUEST_CANCELLED)
)
)
return jsonify({
'message': f'Reward request cancelled for {child.name}.',
'child_id': child.id,
'reward_id': reward_id,
'removed_count': len(removed)
}), 200
@child_api.route('/pending-rewards', methods=['GET'])
def list_pending_rewards():
pending_rewards = pending_reward_db.all()
reward_responses = []
RewardQuery = Query()
ChildQuery = Query()
for pr in pending_rewards:
pending = PendingReward.from_dict(pr)
# Look up reward details
reward_result = reward_db.get(RewardQuery.id == pending.reward_id)
if not reward_result:
continue
reward = Reward.from_dict(reward_result)
# Look up child details
child_result = child_db.get(ChildQuery.id == pending.child_id)
if not child_result:
continue
child = Child.from_dict(child_result)
# Create response object
response = PendingRewardResponse(
_id=pending.id,
child_id=child.id,
child_name=child.name,
child_image_id=child.image_id,
reward_id=reward.id,
reward_name=reward.name,
reward_image_id=reward.image_id
)
reward_responses.append(response.to_dict())
return jsonify({'rewards': reward_responses}), 200