feat: Enhance task and reward assignment logic to prioritize user items over system items with the same name; add corresponding tests
All checks were successful
Gitea Actions Demo / build-and-push (push) Successful in 15s

This commit is contained in:
2026-02-01 23:39:55 -05:00
parent e42c6c1ef2
commit 5351932194
4 changed files with 193 additions and 54 deletions

View File

@@ -22,6 +22,7 @@ from models.pending_reward import PendingReward
from models.reward import Reward
from models.task import Task
from api.utils import get_validated_user_id
from collections import defaultdict
import logging
child_api = Blueprint('child_api', __name__)
@@ -259,22 +260,30 @@ def list_assignable_tasks(id):
child = result[0]
assigned_ids = set(child.get('tasks', []))
# Collect all task ids from the task database
all_task_ids = [t.get('id') for t in task_db.all() if t and t.get('id')]
# Get all assignable tasks (not already assigned)
all_tasks = [t for t in task_db.all() if t and t.get('id') and t.get('id') not in assigned_ids]
# Filter out already assigned
assignable_ids = [tid for tid in all_task_ids if tid not in assigned_ids]
# Group by name
from collections import defaultdict
name_to_tasks = defaultdict(list)
for t in all_tasks:
name_to_tasks[t.get('name')].append(t)
# Fetch full task details and wrap in ChildTask
TaskQuery = Query()
assignable_tasks = []
for tid in assignable_ids:
task = task_db.get((TaskQuery.id == tid) & ((TaskQuery.user_id == user_id) | (TaskQuery.user_id == None)))
if not task:
continue
ct = ChildTask(task.get('name'), task.get('is_good'), task.get('points'), task.get('image_id'), task.get('id'))
assignable_tasks.append(ct.to_dict())
filtered_tasks = []
for name, tasks in name_to_tasks.items():
user_tasks = [t for t in tasks if t.get('user_id') is not None]
if len(user_tasks) == 0:
# Only system task exists
filtered_tasks.append(tasks[0])
elif len(user_tasks) == 1:
# Only one user task: show it, not system
filtered_tasks.append(user_tasks[0])
else:
# Multiple user tasks: show all user tasks, not system
filtered_tasks.extend(user_tasks)
# Wrap in ChildTask and return
assignable_tasks = [ChildTask(t.get('name'), t.get('is_good'), t.get('points'), t.get('image_id'), t.get('id')).to_dict() for t in filtered_tasks]
return jsonify({'tasks': assignable_tasks, 'count': len(assignable_tasks)}), 200
@@ -294,30 +303,41 @@ def list_all_tasks(id):
child = result[0]
assigned_ids = set(child.get('tasks', []))
# Get all tasks from database
# Get all tasks from database (not filtering out assigned, since this is 'all')
ChildTaskQuery = Query()
all_tasks = task_db.search((ChildTaskQuery.user_id == user_id) | (ChildTaskQuery.user_id == None))
tasks = []
name_to_tasks = defaultdict(list)
for t in all_tasks:
name_to_tasks[t.get('name')].append(t)
for task in all_tasks:
filtered_tasks = []
for name, tasks in name_to_tasks.items():
user_tasks = [t for t in tasks if t.get('user_id') is not None]
if len(user_tasks) == 0:
filtered_tasks.append(tasks[0])
elif len(user_tasks) == 1:
filtered_tasks.append(user_tasks[0])
else:
filtered_tasks.extend(user_tasks)
result_tasks = []
for t in filtered_tasks:
if has_type and t.get('is_good') != good:
continue
ct = ChildTask(
task.get('name'),
task.get('is_good'),
task.get('points'),
task.get('image_id'),
task.get('id')
t.get('name'),
t.get('is_good'),
t.get('points'),
t.get('image_id'),
t.get('id')
)
task_dict = ct.to_dict()
if has_type and task.get('is_good') != good:
continue
task_dict.update({'assigned': task.get('id') in assigned_ids})
tasks.append(task_dict)
tasks.sort(key=lambda t: (not t['assigned'], t['name'].lower()))
return jsonify({ 'tasks': tasks, 'count': len(tasks), 'list_type': 'task' }), 200
task_dict.update({'assigned': t.get('id') in assigned_ids})
result_tasks.append(task_dict)
result_tasks.sort(key=lambda t: (not t['assigned'], t['name'].lower()))
return jsonify({ 'tasks': result_tasks, 'count': len(result_tasks), 'list_type': 'task' }), 200
@child_api.route('/child/<id>/trigger-task', methods=['POST'])
@@ -394,25 +414,37 @@ def list_all_rewards(id):
# Get all rewards from database
ChildRewardQuery = Query()
all_rewards = reward_db.search((ChildRewardQuery.user_id == user_id) | (ChildRewardQuery.user_id == None))
rewards = []
for reward in all_rewards:
from collections import defaultdict
name_to_rewards = defaultdict(list)
for r in all_rewards:
name_to_rewards[r.get('name')].append(r)
filtered_rewards = []
for name, rewards in name_to_rewards.items():
user_rewards = [r for r in rewards if r.get('user_id') is not None]
if len(user_rewards) == 0:
filtered_rewards.append(rewards[0])
elif len(user_rewards) == 1:
filtered_rewards.append(user_rewards[0])
else:
filtered_rewards.extend(user_rewards)
result_rewards = []
for r in filtered_rewards:
cr = ChildReward(
reward.get('name'),
reward.get('cost'),
reward.get('image_id'),
reward.get('id')
r.get('name'),
r.get('cost'),
r.get('image_id'),
r.get('id')
)
reward_dict = cr.to_dict()
reward_dict.update({'assigned': reward.get('id') in assigned_ids})
rewards.append(reward_dict)
rewards.sort(key=lambda t: (not t['assigned'], t['name'].lower()))
reward_dict.update({'assigned': r.get('id') in assigned_ids})
result_rewards.append(reward_dict)
result_rewards.sort(key=lambda t: (not t['assigned'], t['name'].lower()))
return jsonify({
'rewards': rewards,
'rewards_count': len(rewards),
'rewards': result_rewards,
'rewards_count': len(result_rewards),
'list_type': 'reward'
}), 200
@@ -511,18 +543,26 @@ def list_assignable_rewards(id):
child = result[0]
assigned_ids = set(child.get('rewards', []))
all_reward_ids = [r.get('id') for r in reward_db.all() if r and r.get('id')]
assignable_ids = [rid for rid in all_reward_ids if rid not in assigned_ids]
# Get all assignable rewards (not already assigned)
all_rewards = [r for r in reward_db.all() if r and r.get('id') and r.get('id') not in assigned_ids]
RewardQuery = Query()
assignable_rewards = []
for rid in assignable_ids:
reward = reward_db.get((RewardQuery.id == rid) & ((RewardQuery.user_id == user_id) | (RewardQuery.user_id == None)))
if not reward:
continue
cr = ChildReward(reward.get('name'), reward.get('cost'), reward.get('image_id'), reward.get('id'))
assignable_rewards.append(cr.to_dict())
# Group by name
from collections import defaultdict
name_to_rewards = defaultdict(list)
for r in all_rewards:
name_to_rewards[r.get('name')].append(r)
filtered_rewards = []
for name, rewards in name_to_rewards.items():
user_rewards = [r for r in rewards if r.get('user_id') is not None]
if len(user_rewards) == 0:
filtered_rewards.append(rewards[0])
elif len(user_rewards) == 1:
filtered_rewards.append(user_rewards[0])
else:
filtered_rewards.extend(user_rewards)
assignable_rewards = [ChildReward(r.get('name'), r.get('cost'), r.get('image_id'), r.get('id')).to_dict() for r in filtered_rewards]
return jsonify({'rewards': assignable_rewards, 'count': len(assignable_rewards)}), 200
@child_api.route('/child/<id>/trigger-reward', methods=['POST'])