Moved things around
Some checks failed
Gitea Actions Demo / build-and-push (push) Failing after 6s

This commit is contained in:
2026-01-21 17:18:58 -05:00
parent a47df7171c
commit a0a059472b
160 changed files with 100 additions and 17 deletions

19
backend/.dockerignore Normal file
View File

@@ -0,0 +1,19 @@
web/
db/*.json
.venv/
__pycache__/
*.pyc
*.pyo
*.pyd
.Python
*.so
*.egg
*.egg-info/
dist/
build/
.git/
.gitignore
.idea/
.vscode/
*.log
.DS_Store

82
backend/.gitignore vendored Normal file
View File

@@ -0,0 +1,82 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
PIPFILE.lock
# Virtual Environment
venv/
ENV/
env/
.venv
# PyCharm
.idea/
*.iml
*.iws
.idea_modules/
# VS Code
.vscode/
# Database files
*.db
*.sqlite
*.sqlite3
data/db/*.json
data/images/
# Flask
instance/
.webassets-cache
# Environment variables
.env
.env.local
.env.*.local
# Node.js / Vue (web directory)
web/node_modules/
web/npm-debug.log*
web/yarn-debug.log*
web/yarn-error.log*
web/dist/
web/.nuxt/
web/.cache/
# OS files
.DS_Store
Thumbs.db
*.swp
*.swo
*~
# Logs
*.log
logs/
# Testing
.pytest_cache/
.coverage
htmlcov/
.tox/
/chore.bundle
/tree.json

18
backend/Dockerfile Normal file
View File

@@ -0,0 +1,18 @@
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 5000
ENV PYTHONUNBUFFERED=1
ENV PYTHONIOENCODING=utf-8
VOLUME ["/app/data"]
# Use Gunicorn instead of python main.py
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "-k", "gevent", "--workers", "1", "--timeout", "120", "--access-logfile", "-", "--error-logfile", "-", "--log-level", "info", "main:app"]

115
backend/Jenkinsfile vendored Normal file
View File

@@ -0,0 +1,115 @@
pipeline {
agent any
environment {
// Tag images with the build number so they are unique
APP_FRONTEND_NAME = "chore-app-frontend"
APP_BACKEND_NAME = "chore-app-backend"
IMAGE_FRONTEND_NAME = "${APP_FRONTEND_NAME}:${env.BUILD_ID}"
IMAGE_BACKEND_NAME = "${APP_BACKEND_NAME}:${env.BUILD_ID}"
CONTAINER_FRONTEND_NAME = "${APP_FRONTEND_NAME}"
CONTAINER_BACKEND_NAME = "${APP_BACKEND_NAME}"
NETWORK_NAME = "chore-app-net"
}
stages {
stage('Checkout') {
steps {
// Pulls code from your configured Git repo
checkout scm
}
}
stage('Set Version') {
steps {
script {
// Extract BASE_VERSION from Python file using grep
env.BASE_VERSION = sh(
script: "grep '^BASE_VERSION' config/version.py | cut -d'\"' -f2",
returnStdout: true
).trim()
echo "BASE_VERSION set to: ${env.BASE_VERSION}"
}
}
}
stage('Build Frontend (Vue) App') {
steps {
dir('web/vue-app') {
sh """docker build --build-arg APP_BUILD=${BUILD_NUMBER} -t ${APP_FRONTEND_NAME}:${BASE_VERSION}-${BUILD_NUMBER} ."""
}
}
}
stage('Build Backend (Flask) App') {
steps {
dir('.') {
sh """docker build --build-arg APP_BUILD=${BUILD_NUMBER} -t ${APP_BACKEND_NAME}:${BASE_VERSION}-${BUILD_NUMBER} ."""
}
}
}
stage('Deploy') {
steps {
echo 'Stopping and removing old containers...'
sh "docker stop ${CONTAINER_FRONTEND_NAME} || true"
sh "docker rm ${CONTAINER_FRONTEND_NAME} || true"
sh "docker stop ${CONTAINER_BACKEND_NAME} || true"
sh "docker rm ${CONTAINER_BACKEND_NAME} || true"
echo 'Cleaning up and creating network...'
sh "docker network rm -f ${NETWORK_NAME} || true"
sh "docker network create ${NETWORK_NAME}"
echo "Starting new containers"
sh """
docker run -d \\
--name ${CONTAINER_FRONTEND_NAME} \\
--network ${NETWORK_NAME} \\
-p 443:443 \\
${APP_FRONTEND_NAME}:${BASE_VERSION}-${BUILD_NUMBER}
"""
sh """
docker run -d \\
--name ${CONTAINER_BACKEND_NAME} \\
--network ${NETWORK_NAME} \\
-e BUILD_NUMBER=${BUILD_NUMBER} \\
-v ${CONTAINER_BACKEND_NAME}_data:/app/data \\
${APP_BACKEND_NAME}:${BASE_VERSION}-${BUILD_NUMBER}
"""
echo 'Deployment complete!'
}
}
stage('Cleanup') {
steps {
echo 'Removing old/dangling Docker images...'
// Remove all images that are not associated with a container
// This targets the untagged images created by previous builds
sh 'docker system prune -a -f'
// OPTIONAL: Remove images older than a certain time (e.g., 24 hours)
// This is useful if you want to keep the most recent successful images
// for quick rollback, but remove others.
// sh 'docker image prune -a -f --filter "until=24h"'
echo 'Docker images pruned.'
// Optional: Stop old containers and run the new ones
// Note: In production, you would push to a registry (DockerHub) instead
sh "echo 'Build Complete. Images ready: ${IMAGE_FRONTEND_NAME} and ${IMAGE_BACKEND_NAME}'"
}
}
stage('Tag Latest') {
steps {
echo 'Tagging deployed images as latest...'
sh "docker tag ${APP_FRONTEND_NAME}:${BASE_VERSION}-${BUILD_NUMBER} ${APP_FRONTEND_NAME}:latest"
sh "docker tag ${APP_BACKEND_NAME}:${BASE_VERSION}-${BUILD_NUMBER} ${APP_BACKEND_NAME}:latest"
}
}
}
}

View File

@@ -0,0 +1,47 @@
pipeline {
agent any
parameters {
// This MUST match the parameter name you defined in the Jenkins UI
string(name: 'SOURCE_TAG', defaultValue: 'latest', description: 'The local image tag (BUILD_ID) to push.')
}
environment {
// Tag images with the build number so they are unique
VUE_CONTAINER_NAME = "chore-app-frontend"
FLASK_CONTAINER_NAME = "chore-app-backend"
REGISTRY = "172.17.0.1:5900" // Your Docker Registry
REPO_FRONTEND = "${REGISTRY}/${VUE_CONTAINER_NAME}"
REPO_BACKEND = "${REGISTRY}/${FLASK_CONTAINER_NAME}"
// Use a new tag (e.g., 'production') for the permanent repo
NEW_PERM_TAG = "production"
}
stages {
stage('Pull Local Images') {
steps {
// Ensure the source images are available locally
sh "docker pull ${VUE_CONTAINER_NAME}:${params.SOURCE_TAG} || true"
sh "docker pull ${FLASK_CONTAINER_NAME}:${params.SOURCE_TAG} || true"
}
}
stage('Tag and Push Known Good') {
steps {
echo "Promoting image tag ${params.SOURCE_TAG} to ${NEW_PERM_TAG} on ${REGISTRY}"
// 1. Tag the specific local image with the permanent tag
sh "docker tag ${VUE_CONTAINER_NAME}:${params.SOURCE_TAG} ${REPO_FRONTEND}:${NEW_PERM_TAG}"
sh "docker tag ${FLASK_CONTAINER_NAME}:${params.SOURCE_TAG} ${REPO_BACKEND}:${NEW_PERM_TAG}"
// 2. Push the newly tagged images
sh "docker push ${REPO_FRONTEND}:${NEW_PERM_TAG}"
sh "docker push ${REPO_BACKEND}:${NEW_PERM_TAG}"
echo "Images pushed successfully. New permanent tag is ${NEW_PERM_TAG}."
}
}
}
}

265
backend/api/auth_api.py Normal file
View File

@@ -0,0 +1,265 @@
import logging
import secrets, jwt
from datetime import datetime, timedelta, timezone
from models.user import User
from flask import Blueprint, request, jsonify, current_app
from flask_mail import Mail, Message
from tinydb import Query
import os
from api.utils import sanitize_email
from config.paths import get_user_image_dir
from api.error_codes import MISSING_FIELDS, EMAIL_EXISTS, MISSING_TOKEN, INVALID_TOKEN, TOKEN_TIMESTAMP_MISSING, \
TOKEN_EXPIRED, ALREADY_VERIFIED, MISSING_EMAIL, USER_NOT_FOUND, MISSING_EMAIL_OR_PASSWORD, INVALID_CREDENTIALS, \
NOT_VERIFIED
from db.db import users_db
logger = logging.getLogger(__name__)
auth_api = Blueprint('auth_api', __name__)
UserQuery = Query()
mail = Mail()
TOKEN_EXPIRY_MINUTES = 60*4
RESET_PASSWORD_TOKEN_EXPIRY_MINUTES = 10
def send_verification_email(to_email, token):
verify_url = f"{current_app.config['FRONTEND_URL']}/auth/verify?token={token}"
html_body = f'Click <a href="{verify_url}">here</a> to verify your account.'
msg = Message(
subject="Verify your account",
recipients=[to_email],
html=html_body,
sender=current_app.config['MAIL_DEFAULT_SENDER']
)
mail.send(msg)
def send_reset_password_email(to_email, token):
reset_url = f"{current_app.config['FRONTEND_URL']}/auth/reset-password?token={token}"
html_body = f'Click <a href="{reset_url}">here</a> to reset your password.'
msg = Message(
subject="Reset your password",
recipients=[to_email],
html=html_body,
sender=current_app.config['MAIL_DEFAULT_SENDER']
)
mail.send(msg)
@auth_api.route('/signup', methods=['POST'])
def signup():
data = request.get_json()
required_fields = ['first_name', 'last_name', 'email', 'password']
if not all(field in data for field in required_fields):
return jsonify({'error': 'Missing required fields', 'code': MISSING_FIELDS}), 400
if users_db.search(UserQuery.email == data['email']):
return jsonify({'error': 'Email already exists', 'code': EMAIL_EXISTS}), 400
token = secrets.token_urlsafe(32)
now_iso = datetime.utcnow().isoformat()
user = User(
first_name=data['first_name'],
last_name=data['last_name'],
email=data['email'],
password=data['password'], # Hash in production!
verified=False,
verify_token=token,
verify_token_created=now_iso,
image_id="boy01"
)
users_db.insert(user.to_dict())
send_verification_email(data['email'], token)
return jsonify({'message': 'User created, verification email sent'}), 201
@auth_api.route('/verify', methods=['GET'])
def verify():
token = request.args.get('token')
status = 'success'
reason = ''
code = ''
user_dict = None
user = None
if not token:
status = 'error'
reason = 'Missing token'
code = MISSING_TOKEN
else:
user_dict = users_db.get(Query().verify_token == token)
user = User.from_dict(user_dict) if user_dict else None
if not user:
status = 'error'
reason = 'Invalid token'
code = INVALID_TOKEN
else:
created_str = user.verify_token_created
if not created_str:
status = 'error'
reason = 'Token timestamp missing'
code = TOKEN_TIMESTAMP_MISSING
else:
created_dt = datetime.fromisoformat(created_str).replace(tzinfo=timezone.utc)
if datetime.now(timezone.utc) - created_dt > timedelta(minutes=TOKEN_EXPIRY_MINUTES):
status = 'error'
reason = 'Token expired'
code = TOKEN_EXPIRED
else:
user.verified = True
user.verify_token = None
user.verify_token_created = None
users_db.update(user.to_dict(), Query().email == user.email)
http_status = 200 if status == 'success' else 400
if http_status == 200 and user is not None:
if not user.email:
logger.error("Verified user has no email field.")
else:
user_image_dir = get_user_image_dir(sanitize_email(user.email))
os.makedirs(user_image_dir, exist_ok=True)
return jsonify({'status': status, 'reason': reason, 'code': code}), http_status
@auth_api.route('/resend-verify', methods=['POST'])
def resend_verify():
data = request.get_json()
email = data.get('email')
if not email:
return jsonify({'error': 'Missing email', 'code': MISSING_EMAIL}), 400
user_dict = users_db.get(UserQuery.email == email)
user = User.from_dict(user_dict) if user_dict else None
if not user:
return jsonify({'error': 'User not found', 'code': USER_NOT_FOUND}), 404
if user.verified:
return jsonify({'error': 'Account already verified', 'code': ALREADY_VERIFIED}), 400
token = secrets.token_urlsafe(32)
now_iso = datetime.utcnow().isoformat()
user.verify_token = token
user.verify_token_created = now_iso
users_db.update(user.to_dict(), UserQuery.email == email)
send_verification_email(email, token)
return jsonify({'message': 'Verification email resent'}), 200
@auth_api.route('/login', methods=['POST'])
def login():
data = request.get_json()
email = data.get('email')
password = data.get('password')
if not email or not password:
return jsonify({'error': 'Missing email or password', 'code': MISSING_EMAIL_OR_PASSWORD}), 400
user_dict = users_db.get(UserQuery.email == email)
user = User.from_dict(user_dict) if user_dict else None
if not user or user.password != password:
return jsonify({'error': 'Invalid credentials', 'code': INVALID_CREDENTIALS}), 401
if not user.verified:
return jsonify({'error': 'This account has not verified', 'code': NOT_VERIFIED}), 403
payload = {
'email': email,
'exp': datetime.utcnow() + timedelta(hours=24*7)
}
token = jwt.encode(payload, current_app.config['SECRET_KEY'], algorithm='HS256')
resp = jsonify({'message': 'Login successful'})
resp.set_cookie('token', token, httponly=True, secure=True, samesite='Strict')
return resp, 200
@auth_api.route('/me', methods=['GET'])
def me():
token = request.cookies.get('token')
if not token:
return jsonify({'error': 'Missing token', 'code': MISSING_TOKEN}), 401
try:
payload = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=['HS256'])
email = payload.get('email')
user_dict = users_db.get(UserQuery.email == email)
user = User.from_dict(user_dict) if user_dict else None
if not user:
return jsonify({'error': 'User not found', 'code': USER_NOT_FOUND}), 404
return jsonify({
'email': user.email,
'id': sanitize_email(user.email),
'first_name': user.first_name,
'last_name': user.last_name,
'verified': user.verified
}), 200
except jwt.ExpiredSignatureError:
return jsonify({'error': 'Token expired', 'code': TOKEN_EXPIRED}), 401
except jwt.InvalidTokenError:
return jsonify({'error': 'Invalid token', 'code': INVALID_TOKEN}), 401
@auth_api.route('/request-password-reset', methods=['POST'])
def request_password_reset():
data = request.get_json()
email = data.get('email')
success_msg = 'If this email is registered, you will receive a password reset link shortly.'
if not email:
return jsonify({'error': 'Missing email', 'code': MISSING_EMAIL}), 400
user_dict = users_db.get(UserQuery.email == email)
user = User.from_dict(user_dict) if user_dict else None
if user:
token = secrets.token_urlsafe(32)
now_iso = datetime.utcnow().isoformat()
user.reset_token = token
user.reset_token_created = now_iso
users_db.update(user.to_dict(), UserQuery.email == email)
send_reset_password_email(email, token)
return jsonify({'message': success_msg}), 200
@auth_api.route('/validate-reset-token', methods=['GET'])
def validate_reset_token():
token = request.args.get('token')
if not token:
return jsonify({'error': 'Missing token', 'code': MISSING_TOKEN}), 400
user_dict = users_db.get(UserQuery.reset_token == token)
user = User.from_dict(user_dict) if user_dict else None
if not user:
return jsonify({'error': 'Invalid token', 'code': INVALID_TOKEN}), 400
created_str = user.reset_token_created
if not created_str:
return jsonify({'error': 'Token timestamp missing', 'code': TOKEN_TIMESTAMP_MISSING}), 400
created_dt = datetime.fromisoformat(created_str).replace(tzinfo=timezone.utc)
if datetime.now(timezone.utc) - created_dt > timedelta(minutes=RESET_PASSWORD_TOKEN_EXPIRY_MINUTES):
return jsonify({'error': 'Token expired', 'code': TOKEN_EXPIRED}), 400
return jsonify({'message': 'Token is valid'}), 200
@auth_api.route('/reset-password', methods=['POST'])
def reset_password():
data = request.get_json()
token = data.get('token')
new_password = data.get('password')
if not token or not new_password:
return jsonify({'error': 'Missing token or password'}), 400
user_dict = users_db.get(UserQuery.reset_token == token)
user = User.from_dict(user_dict) if user_dict else None
if not user:
return jsonify({'error': 'Invalid token', 'code': INVALID_TOKEN}), 400
created_str = user.reset_token_created
if not created_str:
return jsonify({'error': 'Token timestamp missing', 'code': TOKEN_TIMESTAMP_MISSING}), 400
created_dt = datetime.fromisoformat(created_str).replace(tzinfo=timezone.utc)
if datetime.now(timezone.utc) - created_dt > timedelta(minutes=RESET_PASSWORD_TOKEN_EXPIRY_MINUTES):
return jsonify({'error': 'Token expired', 'code': TOKEN_EXPIRED}), 400
user.password = new_password # Hash in production!
user.reset_token = None
user.reset_token_created = None
users_db.update(user.to_dict(), UserQuery.email == user.email)
return jsonify({'message': 'Password has been reset'}), 200

690
backend/api/child_api.py Normal file
View File

@@ -0,0 +1,690 @@
from time import sleep
from flask import Blueprint, request, jsonify
from tinydb import Query
from api.child_rewards import ChildReward
from api.child_tasks import ChildTask
from api.pending_reward import PendingReward as PendingRewardResponse
from api.reward_status import RewardStatus
from api.utils import send_event_for_current_user
from db.db import child_db, task_db, reward_db, pending_reward_db
from events.types.child_modified import ChildModified
from events.types.child_reward_request import ChildRewardRequest
from events.types.child_reward_triggered import ChildRewardTriggered
from events.types.child_rewards_set import ChildRewardsSet
from events.types.child_task_triggered import ChildTaskTriggered
from events.types.child_tasks_set import ChildTasksSet
from events.types.event import Event
from events.types.event_types import EventType
from models.child import Child
from models.pending_reward import PendingReward
from models.reward import Reward
from models.task import Task
import logging
child_api = Blueprint('child_api', __name__)
logger = logging.getLogger(__name__)
@child_api.route('/child/<name>', methods=['GET'])
@child_api.route('/child/<id>', methods=['GET'])
def get_child(id):
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
return jsonify(Child.from_dict(result[0]).to_dict()), 200
@child_api.route('/child/add', methods=['PUT'])
def add_child():
data = request.get_json()
name = data.get('name')
age = data.get('age')
image = data.get('image_id', None)
if not name:
return jsonify({'error': 'Name is required'}), 400
if not image:
image = 'boy01'
child = Child(name=name, age=age, image_id=image)
child_db.insert(child.to_dict())
resp = send_event_for_current_user(
Event(EventType.CHILD_MODIFIED.value, ChildModified(child.id, ChildModified.OPERATION_ADD)))
if resp:
return resp
return jsonify({'message': f'Child {name} added.'}), 201
@child_api.route('/child/<id>/edit', methods=['PUT'])
def edit_child(id):
data = request.get_json()
name = data.get('name', None)
age = data.get('age', None)
points = data.get('points', None)
image = data.get('image_id', None)
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
child = Child.from_dict(result[0])
if name is not None:
child.name = name
if age is not None:
child.age = age
if points is not None:
child.points = points
if image is not None:
child.image_id = image
# Check if points changed and handle pending rewards
if points is not None:
PendingQuery = Query()
pending_rewards = pending_reward_db.search(PendingQuery.child_id == id)
RewardQuery = Query()
for pr in pending_rewards:
pending = PendingReward.from_dict(pr)
reward_result = reward_db.get(RewardQuery.id == pending.reward_id)
if reward_result:
reward = Reward.from_dict(reward_result)
# If child can no longer afford the reward, remove the pending request
if child.points < reward.cost:
pending_reward_db.remove(
(PendingQuery.child_id == id) & (PendingQuery.reward_id == reward.id)
)
resp = send_event_for_current_user(
Event(EventType.CHILD_REWARD_REQUEST.value, ChildRewardRequest(id, reward.id, ChildRewardRequest.REQUEST_CANCELLED)))
if resp:
return resp
child_db.update(child.to_dict(), ChildQuery.id == id)
resp = send_event_for_current_user(Event(EventType.CHILD_MODIFIED.value, ChildModified(id, ChildModified.OPERATION_EDIT)))
if resp:
return resp
return jsonify({'message': f'Child {id} updated.'}), 200
@child_api.route('/child/list', methods=['GET'])
def list_children():
children = child_db.all()
return jsonify({'children': children}), 200
# Child DELETE
@child_api.route('/child/<id>', methods=['DELETE'])
def delete_child(id):
ChildQuery = Query()
if child_db.remove(ChildQuery.id == id):
resp = send_event_for_current_user(Event(EventType.CHILD_MODIFIED.value, ChildModified(id, ChildModified.OPERATION_DELETE)))
if resp:
return resp
return jsonify({'message': f'Child {id} deleted.'}), 200
return jsonify({'error': 'Child not found'}), 404
@child_api.route('/child/<id>/assign-task', methods=['POST'])
def assign_task_to_child(id):
data = request.get_json()
task_id = data.get('task_id')
if not task_id:
return jsonify({'error': 'task_id is required'}), 400
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
child = result[0]
if task_id not in child.get('tasks', []):
child['tasks'].append(task_id)
child_db.update({'tasks': child['tasks']}, ChildQuery.id == id)
return jsonify({'message': f'Task {task_id} assigned to {child['name']}.'}), 200
# python
@child_api.route('/child/<id>/set-tasks', methods=['PUT'])
def set_child_tasks(id):
data = request.get_json() or {}
task_ids = data.get('task_ids')
if 'type' not in data:
return jsonify({'error': 'type is required (good or bad)'}), 400
task_type = data.get('type', 'good')
if task_type not in ['good', 'bad']:
return jsonify({'error': 'type must be either good or bad'}), 400
is_good = task_type == 'good'
if not isinstance(task_ids, list):
return jsonify({'error': 'task_ids must be a list'}), 400
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
child = Child.from_dict(result[0])
new_task_ids = set(task_ids)
# Add all existing child tasks of the opposite type
for task in task_db.all():
if task['id'] in child.tasks and task['is_good'] != is_good:
new_task_ids.add(task['id'])
# Convert back to list if needed
new_tasks = list(new_task_ids)
# Replace tasks with validated IDs
child_db.update({'tasks': new_tasks}, ChildQuery.id == id)
resp = send_event_for_current_user(Event(EventType.CHILD_TASKS_SET.value, ChildTasksSet(id, new_tasks)))
if resp:
return resp
return jsonify({
'message': f'Tasks set for child {id}.',
'task_ids': new_tasks,
'count': len(new_tasks)
}), 200
@child_api.route('/child/<id>/remove-task', methods=['POST'])
def remove_task_from_child(id):
data = request.get_json()
task_id = data.get('task_id')
if not task_id:
return jsonify({'error': 'task_id is required'}), 400
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
child = result[0]
if task_id in child.get('tasks', []):
child['tasks'].remove(task_id)
child_db.update({'tasks': child['tasks']}, ChildQuery.id == id)
return jsonify({'message': f'Task {task_id} removed from {child["name"]}.'}), 200
return jsonify({'error': 'Task not assigned to child'}), 400
@child_api.route('/child/<id>/list-tasks', methods=['GET'])
def list_child_tasks(id):
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
child = result[0]
task_ids = child.get('tasks', [])
TaskQuery = Query()
child_tasks = []
for tid in task_ids:
task = task_db.get(TaskQuery.id == tid)
if not task:
continue
ct = ChildTask(task.get('name'), task.get('is_good'), task.get('points'), task.get('image_id'), task.get('id'))
child_tasks.append(ct.to_dict())
return jsonify({'tasks': child_tasks}), 200
@child_api.route('/child/<id>/list-assignable-tasks', methods=['GET'])
def list_assignable_tasks(id):
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
child = result[0]
assigned_ids = set(child.get('tasks', []))
# Collect all task ids from the task database
all_task_ids = [t.get('id') for t in task_db.all() if t and t.get('id')]
# Filter out already assigned
assignable_ids = [tid for tid in all_task_ids if tid not in assigned_ids]
# Fetch full task details and wrap in ChildTask
TaskQuery = Query()
assignable_tasks = []
for tid in assignable_ids:
task = task_db.get(TaskQuery.id == tid)
if not task:
continue
ct = ChildTask(task.get('name'), task.get('is_good'), task.get('points'), task.get('image_id'), task.get('id'))
assignable_tasks.append(ct.to_dict())
return jsonify({'tasks': assignable_tasks, 'count': len(assignable_tasks)}), 200
@child_api.route('/child/<id>/list-all-tasks', methods=['GET'])
def list_all_tasks(id):
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
has_type = "type" in request.args
if has_type and request.args.get('type') not in ['good', 'bad']:
return jsonify({'error': 'type must be either good or bad'}), 400
good = request.args.get('type', False) == 'good'
child = result[0]
assigned_ids = set(child.get('tasks', []))
# Get all tasks from database
all_tasks = task_db.all()
tasks = []
for task in all_tasks:
if not task or not task.get('id'):
continue
ct = ChildTask(
task.get('name'),
task.get('is_good'),
task.get('points'),
task.get('image_id'),
task.get('id')
)
task_dict = ct.to_dict()
if has_type and task.get('is_good') != good:
continue
task_dict.update({'assigned': task.get('id') in assigned_ids})
tasks.append(task_dict)
tasks.sort(key=lambda t: (not t['assigned'], t['name'].lower()))
return jsonify({ 'tasks': tasks, 'count': len(tasks), 'list_type': 'task' }), 200
@child_api.route('/child/<id>/trigger-task', methods=['POST'])
def trigger_child_task(id):
data = request.get_json()
task_id = data.get('task_id')
if not task_id:
return jsonify({'error': 'task_id is required'}), 400
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
child: Child = Child.from_dict(result[0])
if task_id not in child.tasks:
return jsonify({'error': f'Task not found assigned to child {child.name}'}), 404
# look up the task and get the details
TaskQuery = Query()
task_result = task_db.search(TaskQuery.id == task_id)
if not task_result:
return jsonify({'error': 'Task not found in task database'}), 404
task: Task = Task.from_dict(task_result[0])
# update the child's points based on task type
if task.is_good:
child.points += task.points
else:
child.points -= task.points
child.points = max(child.points, 0)
# update the child in the database
child_db.update({'points': child.points}, ChildQuery.id == id)
resp = send_event_for_current_user(Event(EventType.CHILD_TASK_TRIGGERED.value, ChildTaskTriggered(task.id, child.id, child.points)))
if resp:
return resp
return jsonify({'message': f'{task.name} points assigned to {child.name}.', 'points': child.points, 'id': child.id}), 200
@child_api.route('/child/<id>/assign-reward', methods=['POST'])
def assign_reward_to_child(id):
data = request.get_json()
reward_id = data.get('reward_id')
if not reward_id:
return jsonify({'error': 'reward_id is required'}), 400
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
child = result[0]
if reward_id not in child.get('rewards', []):
child['rewards'].append(reward_id)
child_db.update({'rewards': child['rewards']}, ChildQuery.id == id)
return jsonify({'message': f'Reward {reward_id} assigned to {child["name"]}.'}), 200
@child_api.route('/child/<id>/list-all-rewards', methods=['GET'])
def list_all_rewards(id):
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
child = result[0]
assigned_ids = set(child.get('rewards', []))
# Get all rewards from database
all_rewards = reward_db.all()
rewards = []
for reward in all_rewards:
if not reward or not reward.get('id'):
continue
cr = ChildReward(
reward.get('name'),
reward.get('cost'),
reward.get('image_id'),
reward.get('id')
)
reward_dict = cr.to_dict()
reward_dict.update({'assigned': reward.get('id') in assigned_ids})
rewards.append(reward_dict)
rewards.sort(key=lambda t: (not t['assigned'], t['name'].lower()))
return jsonify({
'rewards': rewards,
'rewards_count': len(rewards),
'list_type': 'reward'
}), 200
@child_api.route('/child/<id>/set-rewards', methods=['PUT'])
def set_child_rewards(id):
data = request.get_json() or {}
reward_ids = data.get('reward_ids')
if not isinstance(reward_ids, list):
return jsonify({'error': 'reward_ids must be a list'}), 400
# Deduplicate and drop falsy values
new_reward_ids = [rid for rid in dict.fromkeys(reward_ids) if rid]
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
# Optional: validate reward IDs exist in the reward DB
RewardQuery = Query()
valid_reward_ids = []
for rid in new_reward_ids:
if reward_db.get(RewardQuery.id == rid):
valid_reward_ids.append(rid)
# Replace rewards with validated IDs
child_db.update({'rewards': valid_reward_ids}, ChildQuery.id == id)
resp = send_event_for_current_user(Event(EventType.CHILD_REWARDS_SET.value, ChildRewardsSet(id, valid_reward_ids)))
if resp:
return resp
return jsonify({
'message': f'Rewards set for child {id}.',
'reward_ids': valid_reward_ids,
'count': len(valid_reward_ids)
}), 200
@child_api.route('/child/<id>/remove-reward', methods=['POST'])
def remove_reward_from_child(id):
data = request.get_json()
reward_id = data.get('reward_id')
if not reward_id:
return jsonify({'error': 'reward_id is required'}), 400
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
child = result[0]
if reward_id in child.get('rewards', []):
child['rewards'].remove(reward_id)
child_db.update({'rewards': child['rewards']}, ChildQuery.id == id)
return jsonify({'message': f'Reward {reward_id} removed from {child["name"]}.'}), 200
return jsonify({'error': 'Reward not assigned to child'}), 400
@child_api.route('/child/<id>/list-rewards', methods=['GET'])
def list_child_rewards(id):
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
child = result[0]
reward_ids = child.get('rewards', [])
RewardQuery = Query()
child_rewards = []
for rid in reward_ids:
reward = reward_db.get(RewardQuery.id == rid)
if not reward:
continue
cr = ChildReward(reward.get('name'), reward.get('cost'), reward.get('image_id'), reward.get('id'))
child_rewards.append(cr.to_dict())
return jsonify({'rewards': child_rewards}), 200
@child_api.route('/child/<id>/list-assignable-rewards', methods=['GET'])
def list_assignable_rewards(id):
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
child = result[0]
assigned_ids = set(child.get('rewards', []))
all_reward_ids = [r.get('id') for r in reward_db.all() if r and r.get('id')]
assignable_ids = [rid for rid in all_reward_ids if rid not in assigned_ids]
RewardQuery = Query()
assignable_rewards = []
for rid in assignable_ids:
reward = reward_db.get(RewardQuery.id == rid)
if not reward:
continue
cr = ChildReward(reward.get('name'), reward.get('cost'), reward.get('image_id'), reward.get('id'))
assignable_rewards.append(cr.to_dict())
return jsonify({'rewards': assignable_rewards, 'count': len(assignable_rewards)}), 200
@child_api.route('/child/<id>/trigger-reward', methods=['POST'])
def trigger_child_reward(id):
data = request.get_json()
reward_id = data.get('reward_id')
if not reward_id:
return jsonify({'error': 'reward_id is required'}), 400
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
child: Child = Child.from_dict(result[0])
if reward_id not in child.rewards:
return jsonify({'error': f'Reward not found assigned to child {child.name}'}), 404
# look up the task and get the details
RewardQuery = Query()
reward_result = reward_db.search(RewardQuery.id == reward_id)
if not reward_result:
return jsonify({'error': 'Reward not found in reward database'}), 404
reward: Reward = Reward.from_dict(reward_result[0])
# Check if child has enough points
logger.info(f'Child {child.name} has {child.points} points, reward {reward.name} costs {reward.cost} points')
if child.points < reward.cost:
points_needed = reward.cost - child.points
return jsonify({
'error': 'Insufficient points',
'points_needed': points_needed,
'current_points': child.points,
'reward_cost': reward.cost
}), 400
# Remove matching pending reward requests for this child and reward
PendingQuery = Query()
removed = pending_reward_db.remove(
(PendingQuery.child_id == child.id) & (PendingQuery.reward_id == reward.id)
)
if removed:
send_event_for_current_user(Event(EventType.CHILD_REWARD_REQUEST.value, ChildRewardRequest(reward.id, child.id, ChildRewardRequest.REQUEST_GRANTED)))
# update the child's points based on reward cost
child.points -= reward.cost
# update the child in the database
child_db.update({'points': child.points}, ChildQuery.id == id)
send_event_for_current_user(Event(EventType.CHILD_REWARD_TRIGGERED.value, ChildRewardTriggered(reward.id, child.id, child.points)))
return jsonify({'message': f'{reward.name} assigned to {child.name}.', 'points': child.points, 'id': child.id}), 200
@child_api.route('/child/<id>/affordable-rewards', methods=['GET'])
def list_affordable_rewards(id):
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
child = Child.from_dict(result[0])
points = child.points
reward_ids = child.rewards
RewardQuery = Query()
affordable = [
Reward.from_dict(reward).to_dict() for reward_id in reward_ids
if (reward := reward_db.get(RewardQuery.id == reward_id)) and points >= Reward.from_dict(reward).cost
]
return jsonify({'affordable_rewards': affordable}), 200
@child_api.route('/child/<id>/reward-status', methods=['GET'])
def reward_status(id):
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
child = Child.from_dict(result[0])
points = child.points
reward_ids = child.rewards
RewardQuery = Query()
statuses = []
for reward_id in reward_ids:
reward: Reward = Reward.from_dict(reward_db.get(RewardQuery.id == reward_id))
if not reward:
continue
points_needed = max(0, reward.cost - points)
#check to see if this reward id and child id is in the pending rewards db if so set its redeeming flag to true
pending_query = Query()
pending = pending_reward_db.get((pending_query.child_id == child.id) & (pending_query.reward_id == reward.id))
status = RewardStatus(reward.id, reward.name, points_needed, reward.cost, pending is not None, reward.image_id)
statuses.append(status.to_dict())
statuses.sort(key=lambda s: (not s['redeeming'], s['cost']))
return jsonify({'reward_status': statuses}), 200
@child_api.route('/child/<id>/request-reward', methods=['POST'])
def request_reward(id):
data = request.get_json()
reward_id = data.get('reward_id')
if not reward_id:
return jsonify({'error': 'reward_id is required'}), 400
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
child = Child.from_dict(result[0])
if reward_id not in child.rewards:
return jsonify({'error': f'Reward not assigned to child {child.name}'}), 404
RewardQuery = Query()
reward_result = reward_db.search(RewardQuery.id == reward_id)
if not reward_result:
return jsonify({'error': 'Reward not found in reward database'}), 404
reward = Reward.from_dict(reward_result[0])
# Check if child has enough points
logger.info(f'Child {child.name} has {child.points} points, reward {reward.name} costs {reward.cost} points')
if child.points < reward.cost:
points_needed = reward.cost - child.points
return jsonify({
'error': 'Insufficient points',
'points_needed': points_needed,
'current_points': child.points,
'reward_cost': reward.cost
}), 400
pending = PendingReward(child_id=child.id, reward_id=reward.id)
pending_reward_db.insert(pending.to_dict())
logger.info(f'Pending reward request created for child {child.name} for reward {reward.name}')
send_event_for_current_user(Event(EventType.CHILD_REWARD_REQUEST.value, ChildRewardRequest(child.id, reward.id, ChildRewardRequest.REQUEST_CREATED)))
return jsonify({
'message': f'Reward request for {reward.name} submitted for {child.name}.',
'reward_id': reward.id,
'reward_name': reward.name,
'child_id': child.id,
'child_name': child.name,
'cost': reward.cost
}), 200
@child_api.route('/child/<id>/cancel-request-reward', methods=['POST'])
def cancel_request_reward(id):
data = request.get_json()
reward_id = data.get('reward_id')
if not reward_id:
return jsonify({'error': 'reward_id is required'}), 400
ChildQuery = Query()
result = child_db.search(ChildQuery.id == id)
if not result:
return jsonify({'error': 'Child not found'}), 404
child = Child.from_dict(result[0])
# Remove matching pending reward request
PendingQuery = Query()
removed = pending_reward_db.remove(
(PendingQuery.child_id == child.id) & (PendingQuery.reward_id == reward_id)
)
if not removed:
return jsonify({'error': 'No pending request found for this reward'}), 404
# Notify user that the request was cancelled
resp = send_event_for_current_user(Event(EventType.CHILD_REWARD_REQUEST.value, ChildRewardRequest(child.id, reward_id, ChildRewardRequest.REQUEST_CANCELLED)))
if resp:
return resp
return jsonify({
'message': f'Reward request cancelled for {child.name}.',
'child_id': child.id,
'reward_id': reward_id,
'removed_count': len(removed)
}), 200
@child_api.route('/pending-rewards', methods=['GET'])
def list_pending_rewards():
pending_rewards = pending_reward_db.all()
reward_responses = []
RewardQuery = Query()
ChildQuery = Query()
for pr in pending_rewards:
pending = PendingReward.from_dict(pr)
# Look up reward details
reward_result = reward_db.get(RewardQuery.id == pending.reward_id)
if not reward_result:
continue
reward = Reward.from_dict(reward_result)
# Look up child details
child_result = child_db.get(ChildQuery.id == pending.child_id)
if not child_result:
continue
child = Child.from_dict(child_result)
# Create response object
response = PendingRewardResponse(
_id=pending.id,
child_id=child.id,
child_name=child.name,
child_image_id=child.image_id,
reward_id=reward.id,
reward_name=reward.name,
reward_image_id=reward.image_id
)
reward_responses.append(response.to_dict())
return jsonify({'rewards': reward_responses, 'count': len(reward_responses), 'list_type': 'notification'}), 200

View File

@@ -0,0 +1,15 @@
# api/child_rewards.py
class ChildReward:
def __init__(self, name: str, cost: int, image_id: str, reward_id: str):
self.name = name
self.cost = cost
self.image_id = image_id
self.id = reward_id
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'cost': self.cost,
'image_id': self.image_id
}

View File

@@ -0,0 +1,16 @@
class ChildTask:
def __init__(self, name, is_good, points, image_id, id):
self.id = id
self.name = name
self.is_good = is_good
self.points = points
self.image_id = image_id
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'is_good': self.is_good,
'points': self.points,
'image_id': self.image_id
}

View File

@@ -0,0 +1,12 @@
MISSING_FIELDS = "MISSING_FIELDS"
EMAIL_EXISTS = "EMAIL_EXISTS"
MISSING_TOKEN = "MISSING_TOKEN"
INVALID_TOKEN = "INVALID_TOKEN"
TOKEN_TIMESTAMP_MISSING = "TOKEN_TIMESTAMP_MISSING"
TOKEN_EXPIRED = "TOKEN_EXPIRED"
MISSING_EMAIL = "MISSING_EMAIL"
USER_NOT_FOUND = "USER_NOT_FOUND"
ALREADY_VERIFIED = "ALREADY_VERIFIED"
MISSING_EMAIL_OR_PASSWORD = "MISSING_EMAIL_OR_PASSWORD"
INVALID_CREDENTIALS = "INVALID_CREDENTIALS"
NOT_VERIFIED = "NOT_VERIFIED"

108
backend/api/image_api.py Normal file
View File

@@ -0,0 +1,108 @@
import os
from PIL import Image as PILImage, UnidentifiedImageError
from flask import Blueprint, request, jsonify, send_file
from tinydb import Query
from api.utils import get_current_user_id, sanitize_email
from config.paths import get_user_image_dir
from db.db import image_db
from models.image import Image
image_api = Blueprint('image_api', __name__)
ALLOWED_EXTENSIONS = {'jpg', 'jpeg', 'png'}
IMAGE_TYPE_PROFILE = 1
IMAGE_TYPE_ICON = 2
MAX_DIMENSION = 512
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@image_api.route('/image/upload', methods=['POST'])
def upload():
user_id = get_current_user_id()
if not user_id:
return jsonify({'error': 'User not authenticated'}), 401
if 'file' not in request.files:
return jsonify({'error': 'No file part in the request'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No selected file'}), 400
image_type = request.form.get('type', None)
if not image_type:
return jsonify({'error': 'Image type is required'}), 400
try:
image_type = int(image_type)
if image_type not in [IMAGE_TYPE_PROFILE, IMAGE_TYPE_ICON]:
return jsonify({'error': 'Invalid image type. Must be 1 or 2'}), 400
except ValueError:
return jsonify({'error': 'Image type must be an integer'}), 400
perm = request.form.get('permanent', "false").lower() == 'true'
if not file or not allowed_file(file.filename):
return jsonify({'error': 'Invalid file type or no file selected'}), 400
try:
pil_image = PILImage.open(file.stream)
original_format = pil_image.format # store before convert
pil_image.verify() # quick integrity check
file.stream.seek(0)
pil_image = PILImage.open(file.stream).convert('RGBA' if pil_image.mode in ('RGBA', 'LA') else 'RGB')
except UnidentifiedImageError:
return jsonify({'error': 'Uploaded file is not a valid image'}), 400
except Exception:
return jsonify({'error': 'Failed to process image'}), 400
if original_format not in ('JPEG', 'PNG'):
return jsonify({'error': 'Only JPEG and PNG images are allowed'}), 400
# Resize preserving aspect ratio (in-place thumbnail)
pil_image.thumbnail((MAX_DIMENSION, MAX_DIMENSION), PILImage.Resampling.LANCZOS)
format_extension_map = {'JPEG': '.jpg', 'PNG': '.png'}
extension = format_extension_map.get(original_format, '.png')
image_record = Image(extension=extension, permanent=perm, type=image_type, user=user_id)
filename = image_record.id + extension
filepath = os.path.abspath(os.path.join(get_user_image_dir(sanitize_email(user_id)), filename))
try:
# Save with appropriate format
save_params = {}
if pil_image.format == 'JPEG':
save_params['quality'] = 90
save_params['optimize'] = True
pil_image.save(filepath, format=pil_image.format, **save_params)
except Exception:
return jsonify({'error': 'Failed to save processed image'}), 500
image_db.insert(image_record.to_dict())
return jsonify({'message': 'Image uploaded successfully', 'filename': filename, 'id': image_record.id}), 200
@image_api.route('/image/request/<id>', methods=['GET'])
def request_image(id):
ImageQuery = Query()
image: Image = Image.from_dict(image_db.get(ImageQuery.id == id))
if not image:
return jsonify({'error': 'Image not found'}), 404
filename = f"{image.id}{image.extension}"
filepath = os.path.abspath(os.path.join(get_user_image_dir(image.user), filename))
if not os.path.exists(filepath):
return jsonify({'error': 'File not found'}), 404
return send_file(filepath)
@image_api.route('/image/list', methods=['GET'])
def list_images():
image_type = request.args.get('type', type=int)
ImageQuery = Query()
if image_type is not None:
if image_type not in [IMAGE_TYPE_PROFILE, IMAGE_TYPE_ICON]:
return jsonify({'error': 'Invalid image type'}), 400
images = image_db.search(ImageQuery.type == image_type)
else:
images = image_db.all()
image_ids = [img['id'] for img in images]
return jsonify({'ids': image_ids, 'count': len(image_ids)}), 200

View File

@@ -0,0 +1,20 @@
class PendingReward:
def __init__(self, _id, child_id, child_name, child_image_id, reward_id, reward_name, reward_image_id):
self.id = _id
self.child_id = child_id
self.child_name = child_name
self.child_image_id = child_image_id
self.reward_id = reward_id
self.reward_name = reward_name
self.reward_image_id = reward_image_id
def to_dict(self):
return {
'id': self.id,
'child_id': self.child_id,
'child_name': self.child_name,
'child_image_id': self.child_image_id,
'reward_id': self.reward_id,
'reward_name': self.reward_name,
'reward_image_id': self.reward_image_id
}

108
backend/api/reward_api.py Normal file
View File

@@ -0,0 +1,108 @@
from flask import Blueprint, request, jsonify
from tinydb import Query
from api.utils import send_event_for_current_user
from db.db import reward_db, child_db
from events.types.event import Event
from events.types.event_types import EventType
from events.types.reward_modified import RewardModified
from models.reward import Reward
reward_api = Blueprint('reward_api', __name__)
# Reward endpoints
@reward_api.route('/reward/add', methods=['PUT'])
def add_reward():
data = request.get_json()
name = data.get('name')
description = data.get('description')
cost = data.get('cost')
image = data.get('image_id', '')
if not name or description is None or cost is None:
return jsonify({'error': 'Name, description, and cost are required'}), 400
reward = Reward(name=name, description=description, cost=cost, image_id=image)
reward_db.insert(reward.to_dict())
resp = send_event_for_current_user(Event(EventType.REWARD_MODIFIED.value,
RewardModified(reward.id, RewardModified.OPERATION_ADD)))
if resp:
return resp
return jsonify({'message': f'Reward {name} added.'}), 201
@reward_api.route('/reward/<id>', methods=['GET'])
def get_reward(id):
RewardQuery = Query()
result = reward_db.search(RewardQuery.id == id)
if not result:
return jsonify({'error': 'Reward not found'}), 404
return jsonify(result[0]), 200
@reward_api.route('/reward/list', methods=['GET'])
def list_rewards():
rewards = reward_db.all()
return jsonify({'rewards': rewards}), 200
@reward_api.route('/reward/<id>', methods=['DELETE'])
def delete_reward(id):
RewardQuery = Query()
removed = reward_db.remove(RewardQuery.id == id)
if removed:
# remove the reward id from any child's reward list
ChildQuery = Query()
for child in child_db.all():
rewards = child.get('rewards', [])
if id in rewards:
rewards.remove(id)
child_db.update({'rewards': rewards}, ChildQuery.id == child.get('id'))
resp = send_event_for_current_user(Event(EventType.REWARD_MODIFIED.value,
RewardModified(id, RewardModified.OPERATION_DELETE)))
if resp:
return resp
return jsonify({'message': f'Reward {id} deleted.'}), 200
return jsonify({'error': 'Reward not found'}), 404
@reward_api.route('/reward/<id>/edit', methods=['PUT'])
def edit_reward(id):
RewardQuery = Query()
existing = reward_db.get(RewardQuery.id == id)
if not existing:
return jsonify({'error': 'Reward not found'}), 404
data = request.get_json(force=True) or {}
updates = {}
if 'name' in data:
name = (data.get('name') or '').strip()
if not name:
return jsonify({'error': 'Name cannot be empty'}), 400
updates['name'] = name
if 'description' in data:
desc = (data.get('description') or '').strip()
if not desc:
return jsonify({'error': 'Description cannot be empty'}), 400
updates['description'] = desc
if 'cost' in data:
cost = data.get('cost')
if not isinstance(cost, int):
return jsonify({'error': 'Cost must be an integer'}), 400
if cost <= 0:
return jsonify({'error': 'Cost must be a positive integer'}), 400
updates['cost'] = cost
if 'image_id' in data:
updates['image_id'] = data.get('image_id', '')
if not updates:
return jsonify({'error': 'No valid fields to update'}), 400
reward_db.update(updates, RewardQuery.id == id)
updated = reward_db.get(RewardQuery.id == id)
resp = send_event_for_current_user(Event(EventType.REWARD_MODIFIED.value,
RewardModified(id, RewardModified.OPERATION_EDIT)))
if resp:
return resp
return jsonify(updated), 200

View File

@@ -0,0 +1,18 @@
class RewardStatus:
def __init__(self, id, name, points_needed, cost, redeeming, image_id):
self.id = id
self.name = name
self.points_needed = points_needed
self.cost = cost
self.redeeming = redeeming
self.image_id = image_id
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'points_needed': self.points_needed,
'cost': self.cost,
'redeeming': self.redeeming,
'image_id': self.image_id
}

110
backend/api/task_api.py Normal file
View File

@@ -0,0 +1,110 @@
from flask import Blueprint, request, jsonify
from tinydb import Query
from api.utils import send_event_for_current_user
from db.db import task_db, child_db
from events.types.event import Event
from events.types.event_types import EventType
from events.types.task_modified import TaskModified
from models.task import Task
task_api = Blueprint('task_api', __name__)
# Task endpoints
@task_api.route('/task/add', methods=['PUT'])
def add_task():
data = request.get_json()
name = data.get('name')
points = data.get('points')
is_good = data.get('is_good')
image = data.get('image_id', '')
if not name or points is None or is_good is None:
return jsonify({'error': 'Name, points, and is_good are required'}), 400
task = Task(name=name, points=points, is_good=is_good, image_id=image)
task_db.insert(task.to_dict())
resp = send_event_for_current_user(Event(EventType.TASK_MODIFIED.value,
TaskModified(task.id, TaskModified.OPERATION_ADD)))
if resp:
return resp
return jsonify({'message': f'Task {name} added.'}), 201
@task_api.route('/task/<id>', methods=['GET'])
def get_task(id):
TaskQuery = Query()
result = task_db.search(TaskQuery.id == id)
if not result:
return jsonify({'error': 'Task not found'}), 404
return jsonify(result[0]), 200
@task_api.route('/task/list', methods=['GET'])
def list_tasks():
ids_param = request.args.get('ids')
tasks = task_db.all()
if ids_param:
ids = set(ids_param.split(','))
tasks = [task for task in tasks if task.get('id') in ids]
return jsonify({'tasks': tasks}), 200
@task_api.route('/task/<id>', methods=['DELETE'])
def delete_task(id):
TaskQuery = Query()
removed = task_db.remove(TaskQuery.id == id)
if removed:
# remove the task id from any child's task list
ChildQuery = Query()
for child in child_db.all():
tasks = child.get('tasks', [])
if id in tasks:
tasks.remove(id)
child_db.update({'tasks': tasks}, ChildQuery.id == child.get('id'))
resp = send_event_for_current_user(Event(EventType.TASK_MODIFIED.value,
TaskModified(id, TaskModified.OPERATION_DELETE)))
if resp:
return resp
return jsonify({'message': f'Task {id} deleted.'}), 200
return jsonify({'error': 'Task not found'}), 404
@task_api.route('/task/<id>/edit', methods=['PUT'])
def edit_task(id):
TaskQuery = Query()
existing = task_db.get(TaskQuery.id == id)
if not existing:
return jsonify({'error': 'Task not found'}), 404
data = request.get_json(force=True) or {}
updates = {}
if 'name' in data:
name = data.get('name', '').strip()
if not name:
return jsonify({'error': 'Name cannot be empty'}), 400
updates['name'] = name
if 'points' in data:
points = data.get('points')
if not isinstance(points, int):
return jsonify({'error': 'Points must be an integer'}), 400
if points <= 0:
return jsonify({'error': 'Points must be a positive integer'}), 400
updates['points'] = points
if 'is_good' in data:
is_good = data.get('is_good')
if not isinstance(is_good, bool):
return jsonify({'error': 'is_good must be a boolean'}), 400
updates['is_good'] = is_good
if 'image_id' in data:
updates['image_id'] = data.get('image_id', '')
if not updates:
return jsonify({'error': 'No valid fields to update'}), 400
task_db.update(updates, TaskQuery.id == id)
updated = task_db.get(TaskQuery.id == id)
resp = send_event_for_current_user(Event(EventType.TASK_MODIFIED.value,
TaskModified(id, TaskModified.OPERATION_EDIT)))
if resp:
return resp
return jsonify(updated), 200

45
backend/api/user_api.py Normal file
View File

@@ -0,0 +1,45 @@
from flask import Blueprint, request, jsonify, current_app
from models.user import User
from tinydb import Query
from db.db import users_db
import jwt
user_api = Blueprint('user_api', __name__)
UserQuery = Query()
def get_current_user():
token = request.cookies.get('token')
if not token:
return None
try:
payload = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=['HS256'])
email = payload.get('email')
user_dict = users_db.get(UserQuery.email == email)
return User.from_dict(user_dict) if user_dict else None
except Exception:
return None
@user_api.route('/user/profile', methods=['GET'])
def get_profile():
user = get_current_user()
if not user:
return jsonify({'error': 'Unauthorized'}), 401
return jsonify({
'first_name': user.first_name,
'last_name': user.last_name,
'email': user.email,
'image_id': user.image_id
}), 200
@user_api.route('/user/image', methods=['PUT'])
def update_image():
user = get_current_user()
if not user:
return jsonify({'error': 'Unauthorized'}), 401
data = request.get_json()
image_id = data.get('image_id')
if not image_id:
return jsonify({'error': 'Missing image_id'}), 400
user.image_id = image_id
users_db.update(user.to_dict(), UserQuery.email == user.email)
return jsonify({'message': 'Image updated', 'image_id': image_id}), 200

28
backend/api/utils.py Normal file
View File

@@ -0,0 +1,28 @@
import jwt
from flask import request, current_app, jsonify
from events.sse import send_event_to_user
def sanitize_email(email):
return email.replace('@', '_at_').replace('.', '_dot_')
def get_current_user_id():
token = request.cookies.get('token')
if not token:
return None
try:
payload = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=['HS256'])
email = payload.get('email')
if not email:
return None
return sanitize_email(email)
except jwt.InvalidTokenError:
return None
def send_event_for_current_user(event):
user_id = get_current_user_id()
if not user_id:
return jsonify({'error': 'Unauthorized'}), 401
send_event_to_user(user_id, event)
return None

27
backend/config/paths.py Normal file
View File

@@ -0,0 +1,27 @@
# python
# file: config/paths.py
import os
# Constant directory names
DATA_DIR_NAME = 'data'
TEST_DATA_DIR_NAME = 'test_data'
# Project root (two levels up from this file)
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
def get_database_dir(db_env: str | None = None) -> str:
"""
Return the absolute base directory path for the given DB env.
db_env: 'prod' uses `data/db`, anything else uses `test_data/db`.
"""
env = (db_env or os.environ.get('DB_ENV', 'prod')).lower()
base_name = DATA_DIR_NAME if env == 'prod' else TEST_DATA_DIR_NAME
return os.path.join(PROJECT_ROOT, base_name, 'db')
def get_user_image_dir(username: str | None) -> str:
"""
Return the absolute directory path for storing images for a specific user.
"""
if username:
return os.path.join(PROJECT_ROOT, DATA_DIR_NAME, 'images', username)
return os.path.join(PROJECT_ROOT, 'resources', 'images')

12
backend/config/version.py Normal file
View File

@@ -0,0 +1,12 @@
# python
# file: config/version.py
import os
BASE_VERSION = "1.0.3" # update manually when releasing features
def get_full_version() -> str:
"""
Return semantic version with optional Jenkins build metadata, e.g. 1.2.3-456.
"""
build = os.environ.get("BUILD_NUMBER") or os.environ.get("APP_BUILD")
return f"{BASE_VERSION}-{build}" if build else BASE_VERSION

100
backend/db/db.py Normal file
View File

@@ -0,0 +1,100 @@
# python
import os
from config.paths import get_database_dir
import threading
from tinydb import TinyDB
base_dir = get_database_dir()
os.makedirs(base_dir, exist_ok=True)
class LockedTable:
"""
Thread-safe wrapper around a TinyDB table. All callable attribute access
is wrapped to acquire a reentrant lock while calling the underlying method.
Non-callable attributes are returned directly.
"""
def __init__(self, table):
self._table = table
self._lock = threading.RLock()
def __getattr__(self, name):
# avoid proxying internal attrs
if name in ('_table', '_lock'):
return super().__getattribute__(name)
attr = getattr(self._table, name)
if callable(attr):
def locked_call(*args, **kwargs):
with self._lock:
return attr(*args, **kwargs)
return locked_call
return attr
# convenience explicit methods (ensure these are class methods, not top-level)
def insert(self, *args, **kwargs):
with self._lock:
return self._table.insert(*args, **kwargs)
def insert_multiple(self, *args, **kwargs):
with self._lock:
return self._table.insert_multiple(*args, **kwargs)
def search(self, *args, **kwargs):
with self._lock:
return self._table.search(*args, **kwargs)
def get(self, *args, **kwargs):
with self._lock:
return self._table.get(*args, **kwargs)
def all(self, *args, **kwargs):
with self._lock:
return self._table.all(*args, **kwargs)
def remove(self, *args, **kwargs):
with self._lock:
return self._table.remove(*args, **kwargs)
def update(self, *args, **kwargs):
with self._lock:
return self._table.update(*args, **kwargs)
def truncate(self):
with self._lock:
return self._table.truncate()
# Setup DB files next to this module
child_path = os.path.join(base_dir, 'children.json')
task_path = os.path.join(base_dir, 'tasks.json')
reward_path = os.path.join(base_dir, 'rewards.json')
image_path = os.path.join(base_dir, 'images.json')
pending_reward_path = os.path.join(base_dir, 'pending_rewards.json')
users_path = os.path.join(base_dir, 'users.json')
# Use separate TinyDB instances/files for each collection
_child_db = TinyDB(child_path, indent=2)
_task_db = TinyDB(task_path, indent=2)
_reward_db = TinyDB(reward_path, indent=2)
_image_db = TinyDB(image_path, indent=2)
_pending_rewards_db = TinyDB(pending_reward_path, indent=2)
_users_db = TinyDB(users_path, indent=2)
# Expose table objects wrapped with locking
child_db = LockedTable(_child_db)
task_db = LockedTable(_task_db)
reward_db = LockedTable(_reward_db)
image_db = LockedTable(_image_db)
pending_reward_db = LockedTable(_pending_rewards_db)
users_db = LockedTable(_users_db)
if os.environ.get('DB_ENV', 'prod') == 'test':
child_db.truncate()
task_db.truncate()
reward_db.truncate()
image_db.truncate()
pending_reward_db.truncate()
users_db.truncate()

108
backend/db/debug.py Normal file
View File

@@ -0,0 +1,108 @@
# python
# File: db/debug.py
import random
from models.child import Child
from models.task import Task
from models.reward import Reward
from models.image import Image
from db.db import child_db, task_db, reward_db, image_db
def populate_debug_data(clear_existing=True, seed=42):
"""
Populate DBs with dummy data:
- 2 children
- 8 tasks
- 4 rewards
Each child gets 3 unique tasks and 2 unique rewards (unique within that child).
Returns a dict with inserted records.
"""
random.seed(seed)
# Optionally clear existing data
if clear_existing:
try:
child_db.truncate()
task_db.truncate()
reward_db.truncate()
image_db.truncate()
except Exception:
# fallback: remove all docs by id if truncate isn't available
from tinydb import Query
for doc in child_db.all():
child_db.remove(Query().id == doc.get('id'))
for doc in task_db.all():
task_db.remove(Query().id == doc.get('id'))
for doc in reward_db.all():
reward_db.remove(Query().id == doc.get('id'))
for doc in image_db.all():
image_db.remove(Query().id == doc.get('id'))
# Create 8 tasks
task_defs = [
("Make Bed", 2, True, 'a86feb1f-be65-4ca2-bdb5-9223ffbd6779'),
("Brush Teeth", 1, True, 'a86feb1f-be65-4ca2-bdb5-9223ffbd6779'),
("Do Homework", 5, True, 'a86feb1f-be65-4ca2-bdb5-9223ffbd6779'),
("Clean Room", 3, True),
("Practice Piano", 4, True),
("Feed Pet", 1, True, 'a86feb1f-be65-4ca2-bdb5-9223ffbd6779'),
("Take Out Trash", 2, True),
("Set Table", 1, True),
("Misc Task 1", 1, False),
("Misc Task 2", 2, False, 'a86feb1f-be65-4ca2-bdb5-9223ffbd6779'),
("Misc Task 3", 3, False),
("Misc Task 4", 4, False, 'a86feb1f-be65-4ca2-bdb5-9223ffbd6779'),
("Misc Task 5", 5, True),
]
tasks = []
for td in task_defs:
if len(td) == 4:
name, points, is_good, image = td
else:
name, points, is_good = td
image = None
t = Task(name=name, points=points, is_good=is_good, image_id=image)
task_db.insert(t.to_dict())
tasks.append(t.to_dict())
# Create 4 rewards
reward_defs = [
("Sticker Pack", "Fun stickers", 3,'a86feb1f-be65-4ca2-bdb5-9223ffbd6779'),
("Extra Screen Time", "30 minutes", 8, 'a86feb1f-be65-4ca2-bdb5-9223ffbd6779'),
("New Toy", "Small toy", 12, 'a86feb1f-be65-4ca2-bdb5-9223ffbd6779'),
("Ice Cream", "One scoop", 5, 'a86feb1f-be65-4ca2-bdb5-9223ffbd6779'),
]
rewards = []
for name, desc, cost, image in reward_defs:
r = Reward(name=name, description=desc, cost=cost, image_id=image)
reward_db.insert(r.to_dict())
rewards.append(r.to_dict())
image_db.insert(Image(1, '.png', True, id='a86feb1f-be65-4ca2-bdb5-9223ffbd6779').to_dict())
# Create 2 children and assign unique tasks/rewards per child
children = []
task_ids = [t['id'] for t in tasks]
reward_ids = [r['id'] for r in rewards]
child_names = [("Child One", 8, "boy01"), ("Child Two", 10, "girl01")]
for name, age, image in child_names:
chosen_tasks = random.sample(task_ids, 11)
chosen_rewards = random.sample(reward_ids, 2)
points = random.randint(0, 15)
c = Child(name=name, age=age, tasks=chosen_tasks, rewards=chosen_rewards, points=points, image_id=image)
child_db.insert(c.to_dict())
children.append(c.to_dict())
return {
"children": children,
"tasks": tasks,
"rewards": rewards
}
if __name__ == "__main__":
result = populate_debug_data(clear_existing=True)
print("Inserted debug data:")
print(f"Children: {[c['name'] for c in result['children']]}")
print(f"Tasks: {[t['name'] for t in result['tasks']]}")
print(f"Rewards: {[r['name'] for r in result['rewards']]}")

160
backend/db/default.py Normal file
View File

@@ -0,0 +1,160 @@
# python
# File: db/debug.py
from tinydb import Query
from api.image_api import IMAGE_TYPE_ICON, IMAGE_TYPE_PROFILE
from db.db import task_db, reward_db, image_db
from models.image import Image
from models.reward import Reward
from models.task import Task
def populate_default_data():
# Create tasks
task_defs = [
('default_001', "Be Respectful", 2, True, ''),
('default_002', "Brush Teeth", 2, True, ''),
('default_003', "Go To Bed", 2, True, ''),
('default_004', "Do What You Are Told", 2, True, ''),
('default_005', "Make Your Bed", 2, True, ''),
('default_006', "Do Homework", 2, True, ''),
]
tasks = []
for _id, name, points, is_good, image in task_defs:
t = Task(name=name, points=points, is_good=is_good, image_id=image, id=_id)
tq = Query()
_result = task_db.search(tq.id == _id)
if not _result:
task_db.insert(t.to_dict())
else:
task_db.update(t.to_dict(), tq.id == _id)
tasks.append(t.to_dict())
# Create 4 rewards
reward_defs = [
('default_001', "Special Trip", "Go to a park or a museum", 3,''),
('default_002', "Money", "Money is always nice", 8, ''),
('default_003', "Choose Dinner", "What shall we eat?", 12, 'meal'),
('default_004', "Tablet Time", "Play your games", 5, 'tablet'),
('default_005', "Computer Time", "Minecraft or Roblox?", 5, 'computer-game'),
('default_006', "TV Time", "Too much is bad for you.", 5, ''),
]
rewards = []
for _id, name, desc, cost, image in reward_defs:
r = Reward(name=name, description=desc, cost=cost, image_id=image, id=_id)
rq = Query()
_result = reward_db.search(rq.id == _id)
if not _result:
reward_db.insert(r.to_dict())
else:
reward_db.update(r.to_dict(), rq.id == _id)
rewards.append(r.to_dict())
image_defs = [
('computer-game', IMAGE_TYPE_ICON, '.png', True),
('ice-cream', IMAGE_TYPE_ICON, '.png', True),
('meal', IMAGE_TYPE_ICON, '.png', True),
('playground', IMAGE_TYPE_ICON, '.png', True),
('tablet', IMAGE_TYPE_ICON, '.png', True),
('boy01', IMAGE_TYPE_PROFILE, '.png', True),
('girl01', IMAGE_TYPE_PROFILE, '.png', True),
('girl02', IMAGE_TYPE_PROFILE, '.png', True),
('boy02', IMAGE_TYPE_PROFILE, '.png', True),
('boy03', IMAGE_TYPE_PROFILE, '.png', True),
('girl03', IMAGE_TYPE_PROFILE, '.png', True),
('boy04', IMAGE_TYPE_PROFILE, '.png', True),
('girl04', IMAGE_TYPE_PROFILE, '.png', True),
]
images = []
for _id, _type, ext, perm in image_defs:
iq = Query()
_result = image_db.search(iq.id == _id)
if not _result:
image_db.insert(Image(_type, ext, perm, id=_id).to_dict())
else:
image_db.update(Image(_type, ext, perm, id=_id).to_dict(), iq.id == _id)
images.append(Image(_type, ext, perm, id=_id).to_dict())
return {
"images": images,
"tasks": tasks,
"rewards": rewards
}
def createDefaultTasks():
"""Create default tasks if none exist."""
if len(task_db.all()) == 0:
default_tasks = [
Task(name="Take out trash", points=20, is_good=True, image_id="trash-can"),
Task(name="Make your bed", points=25, is_good=True, image_id="make-the-bed"),
Task(name="Sweep and clean kitchen", points=15, is_good=True, image_id="vacuum"),
Task(name="Do homework early", points=30, is_good=True, image_id="homework"),
Task(name="Be good for the day", points=15, is_good=True, image_id="good"),
Task(name="Clean your mess", points=20, is_good=True, image_id="broom"),
Task(name="Fighting", points=10, is_good=False, image_id="fighting"),
Task(name="Yelling at parents", points=10, is_good=False, image_id="yelling"),
Task(name="Lying", points=10, is_good=False, image_id="lying"),
Task(name="Not doing what told", points=5, is_good=False, image_id="ignore"),
Task(name="Not flushing toilet", points=5, is_good=False, image_id="toilet"),
]
for task in default_tasks:
task_db.insert(task.to_dict())
def createDefaultRewards():
"""Create default rewards if none exist."""
if len(reward_db.all()) == 0:
default_rewards = [
Reward(name="Choose meal", description="Choose dinner or lunch", cost=30, image_id="meal"),
Reward(name="$1", description="Money is always nice", cost=35, image_id='money'),
Reward(name="$5", description="Even more money", cost=120, image_id='money'),
Reward(name="Tablet 1 hour", description="Play your games", cost=50, image_id='tablet'),
Reward(name="Computer with dad", description="Let's play a game together", cost=50, image_id='games-with-dad'),
Reward(name="Computer 1 hour", description="Minecraft or Terraria?", cost=50, image_id='computer-game'),
Reward(name="TV 1 hour", description="Too much is bad for you.", cost=40, image_id='tv'),
Reward(name="Candy from store", description="Yum!", cost=70, image_id='candy'),
]
for reward in default_rewards:
reward_db.insert(reward.to_dict())
def initializeImages():
"""Initialize the image database with default images if empty."""
if len(image_db.all()) == 0:
image_defs = [
('boy01', IMAGE_TYPE_PROFILE, '.png', True),
('boy02', IMAGE_TYPE_PROFILE, '.png', True),
('boy03', IMAGE_TYPE_PROFILE, '.png', True),
('boy04', IMAGE_TYPE_PROFILE, '.png', True),
('broom', IMAGE_TYPE_ICON, '.png', True),
('candy', IMAGE_TYPE_ICON, '.png', True),
('computer-game', IMAGE_TYPE_ICON, '.png', True),
('fighting', IMAGE_TYPE_ICON, '.png', True),
('games-with-dad', IMAGE_TYPE_ICON, '.png', True),
('girl01', IMAGE_TYPE_PROFILE, '.png', True),
('girl02', IMAGE_TYPE_PROFILE, '.png', True),
('girl03', IMAGE_TYPE_PROFILE, '.png', True),
('girl04', IMAGE_TYPE_PROFILE, '.png', True),
('good', IMAGE_TYPE_ICON, '.png', True),
('homework', IMAGE_TYPE_ICON, '.png', True),
('ice-cream', IMAGE_TYPE_ICON, '.png', True),
('ignore', IMAGE_TYPE_ICON, '.png', True),
('lying', IMAGE_TYPE_ICON, '.png', True),
('make-the-bed', IMAGE_TYPE_ICON, '.png', True),
('meal', IMAGE_TYPE_ICON, '.png', True),
('money', IMAGE_TYPE_ICON, '.png', True),
('playground', IMAGE_TYPE_ICON, '.png', True),
('tablet', IMAGE_TYPE_ICON, '.png', True),
('toilet', IMAGE_TYPE_ICON, '.png', True),
('trash-can', IMAGE_TYPE_ICON, '.png', True),
('tv', IMAGE_TYPE_ICON, '.png', True),
('vacuum', IMAGE_TYPE_ICON, '.png', True),
('yelling', IMAGE_TYPE_ICON, '.png', True),
]
for _id, _type, ext, perm in image_defs:
img = Image(type=_type, extension=ext, permanent=perm)
img.id = _id
image_db.insert(img.to_dict())
if __name__ == "__main__":
result = populate_default_data()

View File

@@ -0,0 +1,10 @@
import time
from threading import Thread
class Broadcaster(Thread):
"""Background thread sending periodic notifications."""
def run(self):
while True:
#push event to all users
time.sleep(5) # Send every 5 seconds

82
backend/events/sse.py Normal file
View File

@@ -0,0 +1,82 @@
import json
import queue
import uuid
from typing import Dict, Any
import logging
from flask import Response
from events.types.event import Event
logger = logging.getLogger(__name__)
# Maps user_id → dict of {connection_id: queue}
user_queues: Dict[str, Dict[str, queue.Queue]] = {}
logging.basicConfig(level=logging.INFO)
def get_queue(user_id: str, connection_id: str) -> queue.Queue:
"""Get or create a queue for a specific user connection."""
if user_id not in user_queues:
user_queues[user_id] = {}
if connection_id not in user_queues[user_id]:
user_queues[user_id][connection_id] = queue.Queue()
return user_queues[user_id][connection_id]
def send_to_user(user_id: str, data: Dict[str, Any]):
"""Send data to all connections for a specific user."""
logger.info(f"Sending data to {user_id} user quesues are {user_queues.keys()}")
logger.info(f"Data: {data}")
if user_id in user_queues:
logger.info(f"Queued {user_id}")
# Format as SSE message once
message = f"data: {json.dumps(data)}\n\n".encode('utf-8')
# Send to all connections for this user
for connection_id, q in user_queues[user_id].items():
try:
q.put(message, block=False)
except queue.Full:
# Skip if queue is full (connection might be dead)
pass
def send_event_to_user(user_id: str, event: Event):
"""Send an Event to all connections for a specific user."""
send_to_user(user_id, event.to_dict())
def sse_response_for_user(user_id: str):
"""Create SSE response for a user connection."""
# Generate unique connection ID
connection_id = str(uuid.uuid4())
user_queue = get_queue(user_id, connection_id)
logger.info(f"SSE response for {user_id} user is {user_queue}")
def generate():
try:
while True:
# Get message from queue (blocks until available)
message = user_queue.get()
yield message
except GeneratorExit:
# Clean up when client disconnects
if user_id in user_queues and connection_id in user_queues[user_id]:
del user_queues[user_id][connection_id]
# Remove user entry if no connections remain
if not user_queues[user_id]:
del user_queues[user_id]
return Response(
generate(),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'X-Accel-Buffering': 'no',
'Connection': 'keep-alive'
}
)

View File

@@ -0,0 +1,21 @@
from events.types.payload import Payload
class ChildModified(Payload):
OPERATION_ADD = "ADD"
OPERATION_EDIT = "EDIT"
OPERATION_DELETE = "DELETE"
def __init__(self, child_id: str, operation: str):
super().__init__({
'child_id': child_id,
'operation': operation,
})
@property
def child_id(self) -> str:
return self.get("child_id")
@property
def operation(self) -> str:
return self.get("operation")

View File

@@ -0,0 +1,27 @@
from events.types.payload import Payload
class ChildRewardRequest(Payload):
REQUEST_GRANTED = "GRANTED"
REQUEST_CREATED = "CREATED"
REQUEST_CANCELLED = "CANCELLED"
def __init__(self, child_id, reward_id: str, operation: str):
super().__init__({
'child_id': child_id,
'reward_id': reward_id,
'operation': operation
})
@property
def child_id(self) -> str:
return self.get("child_id")
@property
def reward_id(self) -> str:
return self.get("reward_id")
@property
def operation(self) -> str:
return self.get("operation")

View File

@@ -0,0 +1,24 @@
from events.types.payload import Payload
class ChildRewardTriggered(Payload):
def __init__(self, reward_id: str, child_id: str, points: int):
super().__init__({
'reward_id': reward_id,
'child_id': child_id,
'points': points
})
@property
def reward_id(self) -> str:
return self.get("reward_id")
@property
def child_id(self) -> str:
return self.get("child_id")
@property
def points(self) -> int:
return self.get("points", 0)

View File

@@ -0,0 +1,20 @@
from events.types.payload import Payload
class ChildRewardsSet(Payload):
def __init__(self, child_id: str, reward_ids: list[str]):
super().__init__({
'child_id': child_id,
'reward_ids': reward_ids
})
@property
def child_id(self) -> str:
return self.get("child_id")
@property
def reward_ids(self) -> list[str]:
return self.get("reward_ids", [])

View File

@@ -0,0 +1,24 @@
from events.types.payload import Payload
class ChildTaskTriggered(Payload):
def __init__(self, task_id: str, child_id: str, points: int):
super().__init__({
'task_id': task_id,
'child_id': child_id,
'points': points
})
@property
def task_id(self) -> str:
return self.get("task_id")
@property
def child_id(self) -> str:
return self.get("child_id")
@property
def points(self) -> int:
return self.get("points", 0)

View File

@@ -0,0 +1,19 @@
from events.types.payload import Payload
class ChildTasksSet(Payload):
def __init__(self, child_id: str, task_ids: list[str]):
super().__init__({
'child_id': child_id,
'task_ids': task_ids
})
@property
def child_id(self) -> str:
return self.get("child_id")
@property
def task_ids(self) -> list[str]:
return self.get("task_ids", [])

View File

@@ -0,0 +1,18 @@
from dataclasses import dataclass, field
import time
from events.types.payload import Payload
@dataclass
class Event:
type: str
payload: Payload
timestamp: float = field(default_factory=time.time)
def to_dict(self):
return {
'type': self.type,
'payload': self.payload.data,
'timestamp': self.timestamp
}

View File

@@ -0,0 +1,15 @@
from enum import Enum
class EventType(Enum):
CHILD_TASK_TRIGGERED = "child_task_triggered"
CHILD_TASKS_SET = "child_tasks_set"
TASK_MODIFIED = "task_modified"
REWARD_MODIFIED = "reward_modified"
CHILD_REWARD_TRIGGERED = "child_reward_triggered"
CHILD_REWARDS_SET = "child_rewards_set"
CHILD_REWARD_REQUEST = "child_reward_request"
CHILD_MODIFIED = "child_modified"

View File

@@ -0,0 +1,6 @@
class Payload:
def __init__(self, data: dict):
self.data = data
def get(self, key: str, default=None):
return self.data.get(key, default)

View File

@@ -0,0 +1,22 @@
from events.types.payload import Payload
class RewardModified(Payload):
OPERATION_ADD = "ADD"
OPERATION_EDIT = "EDIT"
OPERATION_DELETE = "DELETE"
def __init__(self, reward_id: str, operation: str):
super().__init__({
'reward_id': reward_id,
'operation': operation
})
@property
def reward_id(self) -> str:
return self.get("reward_id")
@property
def operation(self) -> str:
return self.get("operation")

View File

@@ -0,0 +1,22 @@
from events.types.payload import Payload
class TaskModified(Payload):
OPERATION_ADD = "ADD"
OPERATION_EDIT = "EDIT"
OPERATION_DELETE = "DELETE"
def __init__(self, task_id: str, operation: str):
super().__init__({
'task_id': task_id,
'operation': operation
})
@property
def task_id(self) -> str:
return self.get("task_id")
@property
def operation(self) -> str:
return self.get("operation")

89
backend/main.py Normal file
View File

@@ -0,0 +1,89 @@
import logging
import sys
from flask import Flask, request, jsonify
from flask_cors import CORS
from api.auth_api import auth_api, mail
from api.child_api import child_api
from api.image_api import image_api
from api.reward_api import reward_api
from api.task_api import task_api
from api.user_api import user_api
from config.version import get_full_version
from db.default import initializeImages, createDefaultTasks, createDefaultRewards
from events.broadcaster import Broadcaster
from events.sse import sse_response_for_user, send_to_user
# Configure logging once at application startup
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
stream=sys.stdout,
force=True # Override any existing config
)
logger = logging.getLogger(__name__)
app = Flask(__name__)
#CORS(app, resources={r"/api/*": {"origins": ["http://localhost:3000", "http://localhost:5173"]}})
app.register_blueprint(child_api)
app.register_blueprint(reward_api)
app.register_blueprint(task_api)
app.register_blueprint(image_api)
app.register_blueprint(auth_api)
app.register_blueprint(user_api)
app.config.update(
MAIL_SERVER='smtp.gmail.com',
MAIL_PORT=587,
MAIL_USE_TLS=True,
MAIL_USERNAME='ryan.kegel@gmail.com',
MAIL_PASSWORD='ruyj hxjf nmrz buar',
MAIL_DEFAULT_SENDER='ryan.kegel@gmail.com',
FRONTEND_URL='https://localhost:5173', # Adjust as needed
SECRET_KEY='supersecretkey' # Replace with a secure key in production
)
mail.init_app(app)
CORS(app)
@app.route("/version")
def api_version():
return jsonify({"version": get_full_version()})
@app.route("/events")
def events():
# Authenticate user or read a token
user_id = request.args.get("user_id")
if not user_id:
return {"error": "Missing user_id"}, 400
return sse_response_for_user(user_id)
@app.route("/notify/<user_id>")
def notify_user(user_id):
# Example trigger
send_to_user(user_id, {
"type": "notification",
"message": f"Hello {user_id}, this is a private message!"
})
return {"status": "sent"}
def start_background_threads():
broadcaster = Broadcaster()
broadcaster.daemon = True
broadcaster.start()
# TODO: implement users
initializeImages()
createDefaultTasks()
createDefaultRewards()
start_background_threads()
if __name__ == '__main__':
app.run(debug=False, host='0.0.0.0', port=5000, threaded=True)

20
backend/models/base.py Normal file
View File

@@ -0,0 +1,20 @@
# python
from dataclasses import dataclass, field
import uuid
import time
@dataclass(kw_only=True)
class BaseModel:
id: str = field(default_factory=lambda: str(uuid.uuid4()))
created_at: float = field(default_factory=time.time)
updated_at: float = field(default_factory=time.time)
def touch(self):
self.updated_at = time.time()
def to_dict(self):
return {
'id': self.id,
'created_at': self.created_at,
'updated_at': self.updated_at
}

38
backend/models/child.py Normal file
View File

@@ -0,0 +1,38 @@
from dataclasses import dataclass, field
from models.base import BaseModel
@dataclass
class Child(BaseModel):
name: str
age: int | None = None
tasks: list[str] = field(default_factory=list)
rewards: list[str] = field(default_factory=list)
points: int = 0
image_id: str | None = None
@classmethod
def from_dict(cls, d: dict):
return cls(
name=d.get('name'),
age=d.get('age'),
tasks=d.get('tasks', []),
rewards=d.get('rewards', []),
points=d.get('points', 0),
image_id=d.get('image_id'),
id=d.get('id'),
created_at=d.get('created_at'),
updated_at=d.get('updated_at')
)
def to_dict(self):
base = super().to_dict()
base.update({
'name': self.name,
'age': self.age,
'tasks': self.tasks,
'rewards': self.rewards,
'points': self.points,
'image_id': self.image_id
})
return base

33
backend/models/image.py Normal file
View File

@@ -0,0 +1,33 @@
# python
from dataclasses import dataclass
from models.base import BaseModel
@dataclass
class Image(BaseModel):
type: int
extension: str
permanent: bool = False
user: str | None = None
@classmethod
def from_dict(cls, d: dict):
# Supports overriding base fields (id, created_at, updated_at) if present
return cls(
type=d.get('type'),
extension=d.get('extension'),
permanent=d.get('permanent', False),
id=d.get('id'),
created_at=d.get('created_at'),
updated_at=d.get('updated_at'),
user=d.get('user')
)
def to_dict(self):
base = super().to_dict()
base.update({
'type': self.type,
'permanent': self.permanent,
'extension': self.extension,
'user': self.user
})
return base

View File

@@ -0,0 +1,28 @@
from dataclasses import dataclass
from models.base import BaseModel
@dataclass
class PendingReward(BaseModel):
child_id: str
reward_id: str
status: str = "pending" # pending, approved, rejected
@classmethod
def from_dict(cls, d: dict):
return cls(
child_id=d.get('child_id'),
reward_id=d.get('reward_id'),
status=d.get('status', 'pending'),
id=d.get('id'),
created_at=d.get('created_at'),
updated_at=d.get('updated_at')
)
def to_dict(self):
base = super().to_dict()
base.update({
'child_id': self.child_id,
'reward_id': self.reward_id,
'status': self.status
})
return base

33
backend/models/reward.py Normal file
View File

@@ -0,0 +1,33 @@
# python
from dataclasses import dataclass
from models.base import BaseModel
@dataclass
class Reward(BaseModel):
name: str
description: str
cost: int
image_id: str | None = None
@classmethod
def from_dict(cls, d: dict):
# Base fields are keyword-only; can be overridden if provided
return cls(
name=d.get('name'),
description=d.get('description'),
cost=d.get('cost', 0),
image_id=d.get('image_id'),
id=d.get('id'),
created_at=d.get('created_at'),
updated_at=d.get('updated_at')
)
def to_dict(self):
base = super().to_dict()
base.update({
'name': self.name,
'description': self.description,
'cost': self.cost,
'image_id': self.image_id
})
return base

31
backend/models/task.py Normal file
View File

@@ -0,0 +1,31 @@
from dataclasses import dataclass
from models.base import BaseModel
@dataclass
class Task(BaseModel):
name: str
points: int
is_good: bool
image_id: str | None = None
@classmethod
def from_dict(cls, d: dict):
return cls(
name=d.get('name'),
points=d.get('points', 0),
is_good=d.get('is_good', True),
image_id=d.get('image_id'),
id=d.get('id'),
created_at=d.get('created_at'),
updated_at=d.get('updated_at')
)
def to_dict(self):
base = super().to_dict()
base.update({
'name': self.name,
'points': self.points,
'is_good': self.is_good,
'image_id': self.image_id
})
return base

50
backend/models/user.py Normal file
View File

@@ -0,0 +1,50 @@
from dataclasses import dataclass, field
from models.base import BaseModel
@dataclass
class User(BaseModel):
first_name: str
last_name: str
email: str
password: str # In production, this should be hashed
verified: bool = False
verify_token: str | None = None
verify_token_created: str | None = None
reset_token: str | None = None
reset_token_created: str | None = None
image_id: str | None = None
@classmethod
def from_dict(cls, d: dict):
return cls(
first_name=d.get('first_name'),
last_name=d.get('last_name'),
email=d.get('email'),
password=d.get('password'),
verified=d.get('verified', False),
verify_token=d.get('verify_token'),
verify_token_created=d.get('verify_token_created'),
reset_token=d.get('reset_token'),
reset_token_created=d.get('reset_token_created'),
image_id=d.get('image_id'),
id=d.get('id'),
created_at=d.get('created_at'),
updated_at=d.get('updated_at')
)
def to_dict(self):
base = super().to_dict()
base.update({
'first_name': self.first_name,
'last_name': self.last_name,
'email': self.email,
'password': self.password,
'verified': self.verified,
'verify_token': self.verify_token,
'verify_token_created': self.verify_token_created,
'reset_token': self.reset_token,
'reset_token_created': self.reset_token_created,
'image_id': self.image_id
})
return base

6
backend/package-lock.json generated Normal file
View File

@@ -0,0 +1,6 @@
{
"name": "backend",
"lockfileVersion": 3,
"requires": true,
"packages": {}
}

BIN
backend/requirements.txt Normal file

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 29 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 26 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 41 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 41 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 29 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 48 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 28 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 50 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 30 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 31 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 30 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 31 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 42 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 37 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 37 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 19 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 44 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 32 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 26 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 49 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 53 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 23 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 18 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 21 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 32 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 35 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 39 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 51 KiB

View File

@@ -0,0 +1,6 @@
import os
import pytest
@pytest.fixture(scope="session", autouse=True)
def set_test_db_env():
os.environ['DB_ENV'] = 'test'

View File

@@ -0,0 +1,229 @@
import pytest
import os
from flask import Flask
from api.child_api import child_api
from db.db import child_db, reward_db, task_db
from tinydb import Query
from models.child import Child
@pytest.fixture
def client():
app = Flask(__name__)
app.register_blueprint(child_api)
app.config['TESTING'] = True
with app.test_client() as client:
yield client
@pytest.fixture(scope="session", autouse=True)
def cleanup_db():
yield
child_db.close()
reward_db.close()
task_db.close()
if os.path.exists('children.json'):
os.remove('children.json')
if os.path.exists('rewards.json'):
os.remove('rewards.json')
if os.path.exists('tasks.json'):
os.remove('tasks.json')
def test_add_child(client):
response = client.put('/child/add', json={'name': 'Alice', 'age': 8})
assert response.status_code == 201
assert b'Child Alice added.' in response.data
children = child_db.all()
assert any(c.get('name') == 'Alice' and c.get('age') == 8 and c.get('image_id') == 'boy01' for c in children)
response = client.put('/child/add', json={'name': 'Mike', 'age': 4, 'image_id': 'girl02'})
assert response.status_code == 201
assert b'Child Mike added.' in response.data
children = child_db.all()
assert any(c.get('name') == 'Mike' and c.get('age') == 4 and c.get('image_id') == 'girl02' for c in children)
def test_list_children(client):
child_db.truncate()
child_db.insert(Child(name='Alice', age=8, image_id="boy01").to_dict())
child_db.insert(Child(name='Mike', age=4, image_id="boy01").to_dict())
response = client.get('/child/list')
assert response.status_code == 200
data = response.json
assert len(data['children']) == 2
def test_assign_and_remove_task(client):
client.put('/child/add', json={'name': 'Bob', 'age': 10})
children = client.get('/child/list').get_json()['children']
child_id = children[0]['id']
response = client.post(f'/child/{child_id}/assign-task', json={'task_id': 'task123'})
assert response.status_code == 200
ChildQuery = Query()
child = child_db.search(ChildQuery.id == child_id)[0]
assert 'task123' in child.get('tasks', [])
response = client.post(f'/child/{child_id}/remove-task', json={'task_id': 'task123'})
assert response.status_code == 200
child = child_db.search(ChildQuery.id == child_id)[0]
assert 'task123' not in child.get('tasks', [])
def test_get_child_not_found(client):
response = client.get('/child/nonexistent-id')
assert response.status_code == 404
assert b'Child not found' in response.data
def test_remove_task_not_found(client):
response = client.post('/child/nonexistent-id/remove-task', json={'task_id': 'task123'})
assert response.status_code == 404
assert b'Child not found' in response.data
def test_remove_reward_not_found(client):
response = client.post('/child/nonexistent-id/remove-reward', json={'reward_id': 'reward123'})
assert response.status_code == 404
assert b'Child not found' in response.data
def test_affordable_rewards(client):
reward_db.insert({'id': 'r_cheep', 'name': 'Sticker', 'cost': 5})
reward_db.insert({'id': 'r_exp', 'name': 'Bike', 'cost': 20})
client.put('/child/add', json={'name': 'Charlie', 'age': 9})
children = client.get('/child/list').get_json()['children']
child_id = next((c['id'] for c in children if c.get('name') == 'Charlie'), children[0]['id'])
ChildQuery = Query()
child_db.update({'points': 10}, ChildQuery.id == child_id)
client.post(f'/child/{child_id}/assign-reward', json={'reward_id': 'r_cheep'})
client.post(f'/child/{child_id}/assign-reward', json={'reward_id': 'r_exp'})
resp = client.get(f'/child/{child_id}/affordable-rewards')
assert resp.status_code == 200
data = resp.get_json()
affordable_ids = [r['id'] for r in data['affordable_rewards']]
assert 'r_cheep' in affordable_ids and 'r_exp' not in affordable_ids
def test_reward_status(client):
reward_db.insert({'id': 'r1', 'name': 'Candy', 'cost': 3, 'image': 'candy.png'})
reward_db.insert({'id': 'r2', 'name': 'Game', 'cost': 8, 'image': 'game.png'})
reward_db.insert({'id': 'r3', 'name': 'Trip', 'cost': 15, 'image': 'trip.png'})
client.put('/child/add', json={'name': 'Dana', 'age': 11})
children = client.get('/child/list').get_json()['children']
child_id = next((c['id'] for c in children if c.get('name') == 'Dana'), children[0]['id'])
ChildQuery = Query()
child_db.update({'points': 7}, ChildQuery.id == child_id)
for rid in ['r1', 'r2', 'r3']:
client.post(f'/child/{child_id}/assign-reward', json={'reward_id': rid})
resp = client.get(f'/child/{child_id}/reward-status')
assert resp.status_code == 200
data = resp.get_json()
mapping = {s['id']: s['points_needed'] for s in data['reward_status']}
assert mapping['r1'] == 0 and mapping['r2'] == 1 and mapping['r3'] == 8
def test_list_child_tasks_returns_tasks(client):
task_db.insert({'id': 't_list_1', 'name': 'Task One', 'points': 2, 'is_good': True})
task_db.insert({'id': 't_list_2', 'name': 'Task Two', 'points': 3, 'is_good': False})
child_db.insert({
'id': 'child_list_1',
'name': 'Eve',
'age': 8,
'points': 0,
'tasks': ['t_list_1', 't_list_2', 't_missing'],
'rewards': []
})
resp = client.get('/child/child_list_1/list-tasks')
assert resp.status_code == 200
data = resp.get_json()
returned_ids = {t['id'] for t in data['tasks']}
assert returned_ids == {'t_list_1', 't_list_2'}
for t in data['tasks']:
assert 'name' in t and 'points' in t and 'is_good' in t
def test_list_assignable_tasks_returns_expected_ids(client):
child_db.truncate()
task_db.truncate()
task_db.insert({'id': 'tA', 'name': 'Task A', 'points': 1, 'is_good': True})
task_db.insert({'id': 'tB', 'name': 'Task B', 'points': 2, 'is_good': True})
task_db.insert({'id': 'tC', 'name': 'Task C', 'points': 3, 'is_good': False})
client.put('/child/add', json={'name': 'Zoe', 'age': 7})
child_id = client.get('/child/list').get_json()['children'][0]['id']
client.post(f'/child/{child_id}/assign-task', json={'task_id': 'tA'})
client.post(f'/child/{child_id}/assign-task', json={'task_id': 'tC'})
resp = client.get(f'/child/{child_id}/list-assignable-tasks')
assert resp.status_code == 200
data = resp.get_json()
assert len(data['tasks']) == 1
assert data['tasks'][0]['id'] == 'tB'
assert data['count'] == 1
def test_list_assignable_tasks_when_none_assigned(client):
child_db.truncate()
task_db.truncate()
ids = ['t1', 't2', 't3']
for i, tid in enumerate(ids, 1):
task_db.insert({'id': tid, 'name': f'Task {i}', 'points': i, 'is_good': True})
client.put('/child/add', json={'name': 'Liam', 'age': 6})
child_id = client.get('/child/list').get_json()['children'][0]['id']
resp = client.get(f'/child/{child_id}/list-assignable-tasks')
assert resp.status_code == 200
data = resp.get_json()
returned_ids = {t['id'] for t in data['tasks']}
assert returned_ids == set(ids)
assert data['count'] == len(ids)
def test_list_assignable_tasks_empty_task_db(client):
child_db.truncate()
task_db.truncate()
client.put('/child/add', json={'name': 'Mia', 'age': 5})
child_id = client.get('/child/list').get_json()['children'][0]['id']
resp = client.get(f'/child/{child_id}/list-assignable-tasks')
assert resp.status_code == 200
data = resp.get_json()
assert data['tasks'] == []
assert data['count'] == 0
def test_list_assignable_tasks_child_not_found(client):
resp = client.get('/child/does-not-exist/list-assignable-tasks')
assert resp.status_code == 404
assert b'Child not found' in resp.data
def setup_child_with_tasks(child_name='TestChild', age=8, assigned=None):
child_db.truncate()
task_db.truncate()
assigned = assigned or []
# Seed tasks
task_db.insert({'id': 't1', 'name': 'Task 1', 'points': 1, 'is_good': True})
task_db.insert({'id': 't2', 'name': 'Task 2', 'points': 2, 'is_good': False})
task_db.insert({'id': 't3', 'name': 'Task 3', 'points': 3, 'is_good': True})
# Seed child
child = Child(name=child_name, age=age, image_id='boy01').to_dict()
child['tasks'] = assigned[:]
child_db.insert(child)
return child['id']
def test_list_all_tasks_partitions_assigned_and_assignable(client):
child_id = setup_child_with_tasks(assigned=['t1', 't3'])
resp = client.get(f'/child/{child_id}/list-all-tasks')
assert resp.status_code == 200
data = resp.get_json()
assigned_ids = {t['id'] for t in data['assigned_tasks']}
assignable_ids = {t['id'] for t in data['assignable_tasks']}
assert assigned_ids == {'t1', 't3'}
assert assignable_ids == {'t2'}
assert data['assigned_count'] == 2
assert data['assignable_count'] == 1
def test_set_child_tasks_replaces_existing(client):
child_id = setup_child_with_tasks(assigned=['t1', 't2'])
# Provide new set including a valid and an invalid id (invalid should be filtered)
payload = {'task_ids': ['t3', 'missing', 't3']}
resp = client.put(f'/child/{child_id}/set-tasks', json=payload)
assert resp.status_code == 200
data = resp.get_json()
assert data['task_ids'] == ['t3']
assert data['count'] == 1
ChildQuery = Query()
child = child_db.get(ChildQuery.id == child_id)
assert child['tasks'] == ['t3']
def test_set_child_tasks_requires_list(client):
child_id = setup_child_with_tasks(assigned=['t2'])
resp = client.put(f'/child/{child_id}/set-tasks', json={'task_ids': 'not-a-list'})
assert resp.status_code == 400
assert b'task_ids must be a list' in resp.data
def test_set_child_tasks_child_not_found(client):
resp = client.put('/child/does-not-exist/set-tasks', json={'task_ids': ['t1', 't2']})
assert resp.status_code == 404
assert b'Child not found' in resp.data

View File

@@ -0,0 +1,180 @@
# python
import io
import os
from PIL import Image as PILImage
import pytest
from flask import Flask
from api.image_api import image_api, UPLOAD_FOLDER
from db.db import image_db
IMAGE_TYPE_PROFILE = 1
IMAGE_TYPE_ICON = 2
MAX_DIMENSION = 512
@pytest.fixture
def client():
app = Flask(__name__)
app.register_blueprint(image_api)
app.config['TESTING'] = True
with app.test_client() as c:
yield c
for f in os.listdir(UPLOAD_FOLDER):
os.remove(os.path.join(UPLOAD_FOLDER, f))
image_db.truncate()
def make_image_bytes(w, h, mode='RGB', color=(255, 0, 0, 255), fmt='PNG'):
img = PILImage.new(mode, (w, h), color)
bio = io.BytesIO()
img.save(bio, format=fmt)
bio.seek(0)
return bio
def list_saved_files():
return [f for f in os.listdir(UPLOAD_FOLDER) if os.path.isfile(os.path.join(UPLOAD_FOLDER, f))]
def test_upload_missing_type(client):
img = make_image_bytes(100, 100, fmt='PNG')
data = {'file': (img, 'test.png')}
resp = client.post('/image/upload', data=data, content_type='multipart/form-data')
assert resp.status_code == 400
assert b'Image type is required' in resp.data
def test_upload_invalid_type_value(client):
img = make_image_bytes(50, 50, fmt='PNG')
data = {'file': (img, 'test.png'), 'type': '3'}
resp = client.post('/image/upload', data=data, content_type='multipart/form-data')
assert resp.status_code == 400
assert b'Invalid image type. Must be 1 or 2' in resp.data
def test_upload_non_integer_type(client):
img = make_image_bytes(50, 50, fmt='PNG')
data = {'file': (img, 'test.png'), 'type': 'abc'}
resp = client.post('/image/upload', data=data, content_type='multipart/form-data')
assert resp.status_code == 400
assert b'Image type must be an integer' in resp.data
def test_upload_valid_png_with_id(client):
image_db.truncate()
img = make_image_bytes(120, 80, fmt='PNG')
data = {'file': (img, 'sample.png'), 'type': str(IMAGE_TYPE_ICON), 'permanent': 'true'}
resp = client.post('/image/upload', data=data, content_type='multipart/form-data')
assert resp.status_code == 200
j = resp.get_json()
assert 'id' in j and j['id']
assert j['message'] == 'Image uploaded successfully'
# DB entry
records = image_db.all()
assert len(records) == 1
rec = records[0]
assert rec['id'] == j['id']
assert rec['permanent'] is True
def test_upload_valid_jpeg_extension_mapping(client):
image_db.truncate()
img = make_image_bytes(100, 60, mode='RGB', fmt='JPEG')
data = {'file': (img, 'photo.jpg'), 'type': str(IMAGE_TYPE_PROFILE)}
resp = client.post('/image/upload', data=data, content_type='multipart/form-data')
assert resp.status_code == 200
j = resp.get_json()
# Expected extension should be .jpg (this will fail with current code if format lost)
filename = j['filename']
assert filename.endswith('.jpg'), "JPEG should be saved with .jpg extension (code may be using None format)"
path = os.path.join(UPLOAD_FOLDER, filename)
assert os.path.exists(path)
def test_upload_png_alpha_preserved(client):
image_db.truncate()
img = make_image_bytes(64, 64, mode='RGBA', color=(10, 20, 30, 128), fmt='PNG')
data = {'file': (img, 'alpha.png'), 'type': str(IMAGE_TYPE_ICON)}
resp = client.post('/image/upload', data=data, content_type='multipart/form-data')
assert resp.status_code == 200
j = resp.get_json()
path = os.path.join(UPLOAD_FOLDER, j['filename'])
with PILImage.open(path) as saved:
# Alpha should exist (mode RGBA); if conversion changed it incorrectly this fails
assert saved.mode in ('RGBA', 'LA')
def test_upload_large_image_resized(client):
image_db.truncate()
img = make_image_bytes(2000, 1500, fmt='PNG')
data = {'file': (img, 'large.png'), 'type': str(IMAGE_TYPE_PROFILE)}
resp = client.post('/image/upload', data=data, content_type='multipart/form-data')
assert resp.status_code == 200
j = resp.get_json()
path = os.path.join(UPLOAD_FOLDER, j['filename'])
with PILImage.open(path) as saved:
assert saved.width <= MAX_DIMENSION
assert saved.height <= MAX_DIMENSION
def test_upload_invalid_image_content(client):
bogus = io.BytesIO(b'notanimage')
data = {'file': (bogus, 'bad.png'), 'type': str(IMAGE_TYPE_ICON)}
resp = client.post('/image/upload', data=data, content_type='multipart/form-data')
assert resp.status_code == 400
# Could be 'Uploaded file is not a valid image' or 'Failed to process image'
assert b'valid image' in resp.data or b'Failed to process image' in resp.data
def test_upload_invalid_extension(client):
image_db.truncate()
img = make_image_bytes(40, 40, fmt='PNG')
data = {'file': (img, 'note.gif'), 'type': str(IMAGE_TYPE_ICON)}
resp = client.post('/image/upload', data=data, content_type='multipart/form-data')
assert resp.status_code == 400
assert b'Invalid file type' in resp.data or b'Invalid file type or no file selected' in resp.data
def test_request_image_success(client):
image_db.truncate()
img = make_image_bytes(30, 30, fmt='PNG')
data = {'file': (img, 'r.png'), 'type': str(IMAGE_TYPE_ICON)}
up = client.post('/image/upload', data=data, content_type='multipart/form-data')
assert up.status_code == 200
recs = image_db.all()
image_id = recs[0]['id']
resp = client.get(f'/image/request/{image_id}')
assert resp.status_code == 200
def test_request_image_not_found(client):
resp = client.get('/image/request/missing-id')
assert resp.status_code == 404
assert b'Image not found' in resp.data
def test_list_images_filter_type(client):
image_db.truncate()
# Upload type 1
for _ in range(2):
img = make_image_bytes(20, 20, fmt='PNG')
client.post('/image/upload', data={'file': (img, 'a.png'), 'type': '1'}, content_type='multipart/form-data')
# Upload type 2
for _ in range(3):
img = make_image_bytes(25, 25, fmt='PNG')
client.post('/image/upload', data={'file': (img, 'b.png'), 'type': '2'}, content_type='multipart/form-data')
resp = client.get('/image/list?type=2')
assert resp.status_code == 200
j = resp.get_json()
assert j['count'] == 3
def test_list_images_invalid_type_query(client):
resp = client.get('/image/list?type=99')
assert resp.status_code == 400
assert b'Invalid image type' in resp.data
def test_list_images_all(client):
image_db.truncate()
for _ in range(4):
img = make_image_bytes(10, 10, fmt='PNG')
client.post('/image/upload', data={'file': (img, 'x.png'), 'type': '2'}, content_type='multipart/form-data')
resp = client.get('/image/list')
assert resp.status_code == 200
j = resp.get_json()
assert j['count'] == 4
assert len(j['ids']) == 4
def test_permanent_flag_false_default(client):
image_db.truncate()
img = make_image_bytes(32, 32, fmt='PNG')
data = {'file': (img, 't.png'), 'type': '1'}
resp = client.post('/image/upload', data=data, content_type='multipart/form-data')
assert resp.status_code == 200
recs = image_db.all()
assert recs[0]['permanent'] is False

View File

@@ -0,0 +1,83 @@
import pytest
import os
from flask import Flask
from api.reward_api import reward_api
from db.db import reward_db, child_db
from tinydb import Query
from models.reward import Reward
@pytest.fixture
def client():
app = Flask(__name__)
app.register_blueprint(reward_api)
app.config['TESTING'] = True
with app.test_client() as client:
yield client
@pytest.fixture(scope="session", autouse=True)
def cleanup_db():
yield
reward_db.close()
if os.path.exists('rewards.json'):
os.remove('rewards.json')
def test_add_reward(client):
response = client.put('/reward/add', json={'name': 'Vacation', 'cost': 10, 'description': "A test reward"})
assert response.status_code == 201
assert b'Reward Vacation added.' in response.data
# verify in database
rewards = reward_db.all()
assert any(reward.get('name') == 'Vacation' and reward.get('cost') == 10 and reward.get('description') == "A test reward" and reward.get('image_id') == '' for reward in rewards)
response = client.put('/reward/add', json={'name': 'Ice Cream', 'cost': 4, 'description': "A test ice cream", 'image_id': 'ice_cream'})
assert response.status_code == 201
assert b'Reward Ice Cream added.' in response.data
# verify in database
rewards = reward_db.all()
assert any(reward.get('name') == 'Ice Cream' and reward.get('cost') == 4 and reward.get('description') == "A test ice cream" and reward.get('image_id') == 'ice_cream' for reward in rewards)
def test_list_rewards(client):
reward_db.truncate()
reward_db.insert(Reward(name='Reward1', description='Desc1', cost=5).to_dict())
reward_db.insert(Reward(name='Reward2', description='Desc2', cost=15, image_id='ice_cream').to_dict())
response = client.get('/reward/list')
assert response.status_code == 200
assert b'rewards' in response.data
data = response.json
assert len(data['rewards']) == 2
def test_get_reward_not_found(client):
response = client.get('/reward/nonexistent-id')
assert response.status_code == 404
assert b'Reward not found' in response.data
def test_delete_reward_not_found(client):
response = client.delete('/reward/nonexistent-id')
assert response.status_code == 404
assert b'Reward not found' in response.data
def test_delete_assigned_reward_removes_from_child(client):
# create task and child with the task already assigned
reward_db.insert({'id': 'r_delete_assigned', 'name': 'Temp Task', 'cost': 5})
child_db.insert({
'id': 'child_for_reward_delete',
'name': 'Frank',
'age': 7,
'points': 0,
'rewards': ['r_delete_assigned'],
'tasks': []
})
ChildQuery = Query()
# precondition: child has the task
assert 'r_delete_assigned' in child_db.search(ChildQuery.id == 'child_for_reward_delete')[0].get('rewards', [])
# call the delete endpoint
resp = client.delete('/reward/r_delete_assigned')
assert resp.status_code == 200
# verify the task id is no longer in the child's tasks
child = child_db.search(ChildQuery.id == 'child_for_reward_delete')[0]
assert 'r_delete_assigned' not in child.get('rewards', [])

View File

@@ -0,0 +1,83 @@
import pytest
import os
from flask import Flask
from api.task_api import task_api
from db.db import task_db, child_db
from tinydb import Query
@pytest.fixture
def client():
app = Flask(__name__)
app.register_blueprint(task_api)
app.config['TESTING'] = True
with app.test_client() as client:
yield client
@pytest.fixture(scope="session", autouse=True)
def cleanup_db():
yield
task_db.close()
if os.path.exists('tasks.json'):
os.remove('tasks.json')
def test_add_task(client):
response = client.put('/task/add', json={'name': 'Clean Room', 'points': 10, 'is_good': True})
assert response.status_code == 201
assert b'Task Clean Room added.' in response.data
# verify in database
tasks = task_db.all()
assert any(task.get('name') == 'Clean Room' and task.get('points') == 10 and task.get('is_good') is True and task.get('image_id') == '' for task in tasks)
response = client.put('/task/add', json={'name': 'Eat Dinner', 'points': 5, 'is_good': False, 'image_id': 'meal'})
assert response.status_code == 201
assert b'Task Eat Dinner added.' in response.data
# verify in database
tasks = task_db.all()
assert any(task.get('name') == 'Eat Dinner' and task.get('points') == 5 and task.get('is_good') is False and task.get('image_id') == 'meal' for task in tasks)
def test_list_tasks(client):
task_db.truncate()
task_db.insert({'id': 't1', 'name': 'Task1', 'points': 5, 'is_good': True})
task_db.insert({'id': 't2', 'name': 'Task2', 'points': 15, 'is_good': False, 'image_id': 'meal'})
response = client.get('/task/list')
assert response.status_code == 200
assert b'tasks' in response.data
data = response.json
assert len(data['tasks']) == 2
def test_get_task_not_found(client):
response = client.get('/task/nonexistent-id')
assert response.status_code == 404
assert b'Task not found' in response.data
def test_delete_task_not_found(client):
response = client.delete('/task/nonexistent-id')
assert response.status_code == 404
assert b'Task not found' in response.data
def test_delete_assigned_task_removes_from_child(client):
# create task and child with the task already assigned
task_db.insert({'id': 't_delete_assigned', 'name': 'Temp Task', 'points': 5, 'is_good': True})
child_db.insert({
'id': 'child_for_task_delete',
'name': 'Frank',
'age': 7,
'points': 0,
'tasks': ['t_delete_assigned'],
'rewards': []
})
ChildQuery = Query()
# precondition: child has the task
assert 't_delete_assigned' in child_db.search(ChildQuery.id == 'child_for_task_delete')[0].get('tasks', [])
# call the delete endpoint
resp = client.delete('/task/t_delete_assigned')
assert resp.status_code == 200
# verify the task id is no longer in the child's tasks
child = child_db.search(ChildQuery.id == 'child_for_task_delete')[0]
assert 't_delete_assigned' not in child.get('tasks', [])