feat: Implement admin role validation and enhance user management scripts
All checks were successful
Gitea Actions Demo / build-and-push (push) Successful in 17s
All checks were successful
Gitea Actions Demo / build-and-push (push) Successful in 17s
This commit is contained in:
@@ -195,7 +195,7 @@ In `backend/main.py`, import and call `start_deletion_scheduler()` after Flask a
|
||||
- [x] Implement `PUT /admin/deletion-threshold` endpoint
|
||||
- [x] Implement `POST /admin/deletion-queue/trigger` endpoint
|
||||
- [x] Add JWT authentication checks for all admin endpoints
|
||||
- [ ] Add admin role validation
|
||||
- [x] Add admin role validation
|
||||
|
||||
### SSE Event
|
||||
|
||||
@@ -253,7 +253,7 @@ In `backend/main.py`, import and call `start_deletion_scheduler()` after Flask a
|
||||
- [x] Test `GET /admin/deletion-threshold` returns current threshold
|
||||
- [x] Test `PUT /admin/deletion-threshold` updates threshold
|
||||
- [x] Test `PUT /admin/deletion-threshold` validates min/max
|
||||
- [ ] Test `PUT /admin/deletion-threshold` requires admin role
|
||||
- [x] Test `PUT /admin/deletion-threshold` requires admin role
|
||||
- [x] Test `POST /admin/deletion-queue/trigger` triggers scheduler
|
||||
- [x] Test `POST /admin/deletion-queue/trigger` returns summary
|
||||
|
||||
|
||||
15
README.md
15
README.md
@@ -77,7 +77,16 @@ export ACCOUNT_DELETION_THRESHOLD_HOURS=168 # 7 days
|
||||
|
||||
### Admin Endpoints
|
||||
|
||||
All admin endpoints require JWT authentication and admin role.
|
||||
All admin endpoints require JWT authentication and **admin role**.
|
||||
|
||||
**Note:** Admin users must be created manually or via the provided script (`backend/scripts/create_admin.py`). The admin role cannot be assigned through the signup API for security reasons.
|
||||
|
||||
**Creating an Admin User:**
|
||||
|
||||
```bash
|
||||
cd backend
|
||||
python scripts/create_admin.py
|
||||
```
|
||||
|
||||
#### Account Deletion Management
|
||||
|
||||
@@ -123,7 +132,9 @@ npm run test
|
||||
## 🔒 Security
|
||||
|
||||
- JWT tokens stored in HttpOnly, Secure, SameSite=Strict cookies
|
||||
- Admin-only endpoints protected by role validation
|
||||
- **Role-Based Access Control (RBAC)**: Admin endpoints protected by admin role validation
|
||||
- Admin users can only be created via direct database manipulation or provided script
|
||||
- Regular users cannot escalate privileges to admin
|
||||
- Account deletion requires email confirmation
|
||||
- Marked accounts blocked from login immediately
|
||||
|
||||
|
||||
@@ -18,8 +18,7 @@ admin_api = Blueprint('admin_api', __name__)
|
||||
|
||||
def admin_required(f):
|
||||
"""
|
||||
Decorator to require admin authentication for endpoints.
|
||||
For now, this is a placeholder - you should implement proper admin role checking.
|
||||
Decorator to require admin role for endpoints.
|
||||
"""
|
||||
@wraps(f)
|
||||
def decorated_function(*args, **kwargs):
|
||||
@@ -43,12 +42,14 @@ def admin_required(f):
|
||||
if not user_dict:
|
||||
return jsonify({'error': 'User not found', 'code': 'USER_NOT_FOUND'}), 404
|
||||
|
||||
# TODO: Check if user has admin role
|
||||
# For now, all authenticated users can access admin endpoints
|
||||
# In production, you should check user.role == 'admin' or similar
|
||||
user = User.from_dict(user_dict)
|
||||
|
||||
# Check if user has admin role
|
||||
if user.role != 'admin':
|
||||
return jsonify({'error': 'Admin access required', 'code': 'ADMIN_REQUIRED'}), 403
|
||||
|
||||
# Pass user to the endpoint
|
||||
request.current_user = User.from_dict(user_dict)
|
||||
request.current_user = user
|
||||
|
||||
except jwt.ExpiredSignatureError:
|
||||
return jsonify({'error': 'Token expired', 'code': 'TOKEN_EXPIRED'}), 401
|
||||
|
||||
@@ -20,6 +20,7 @@ class User(BaseModel):
|
||||
marked_for_deletion_at: str | None = None
|
||||
deletion_in_progress: bool = False
|
||||
deletion_attempted_at: str | None = None
|
||||
role: str = 'user'
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, d: dict):
|
||||
@@ -41,6 +42,7 @@ class User(BaseModel):
|
||||
marked_for_deletion_at=d.get('marked_for_deletion_at'),
|
||||
deletion_in_progress=d.get('deletion_in_progress', False),
|
||||
deletion_attempted_at=d.get('deletion_attempted_at'),
|
||||
role=d.get('role', 'user'),
|
||||
id=d.get('id'),
|
||||
created_at=d.get('created_at'),
|
||||
updated_at=d.get('updated_at')
|
||||
@@ -66,6 +68,7 @@ class User(BaseModel):
|
||||
'marked_for_deletion': self.marked_for_deletion,
|
||||
'marked_for_deletion_at': self.marked_for_deletion_at,
|
||||
'deletion_in_progress': self.deletion_in_progress,
|
||||
'deletion_attempted_at': self.deletion_attempted_at
|
||||
'deletion_attempted_at': self.deletion_attempted_at,
|
||||
'role': self.role
|
||||
})
|
||||
return base
|
||||
|
||||
58
backend/scripts/README.md
Normal file
58
backend/scripts/README.md
Normal file
@@ -0,0 +1,58 @@
|
||||
# Backend Scripts
|
||||
|
||||
Utility scripts for backend management tasks.
|
||||
|
||||
## create_admin.py
|
||||
|
||||
Creates an admin user account with elevated privileges.
|
||||
|
||||
### Usage
|
||||
|
||||
```bash
|
||||
cd backend
|
||||
python scripts/create_admin.py
|
||||
```
|
||||
|
||||
The script will prompt you for:
|
||||
|
||||
- Email address
|
||||
- Password (minimum 8 characters)
|
||||
- First name
|
||||
- Last name
|
||||
|
||||
### Security Notes
|
||||
|
||||
- Admin users can only be created through this script or direct database manipulation
|
||||
- The admin role cannot be assigned through the signup API
|
||||
- Existing email addresses will be rejected
|
||||
- Passwords are hashed using werkzeug's secure hash algorithm
|
||||
|
||||
### Example
|
||||
|
||||
```bash
|
||||
$ python scripts/create_admin.py
|
||||
=== Create Admin User ===
|
||||
|
||||
Email: admin@example.com
|
||||
Password: ********
|
||||
First name: Admin
|
||||
Last name: User
|
||||
|
||||
Create admin user 'admin@example.com'? (yes/no): yes
|
||||
✓ Admin user created successfully!
|
||||
Email: admin@example.com
|
||||
Name: Admin User
|
||||
Role: admin
|
||||
```
|
||||
|
||||
## Requirements
|
||||
|
||||
The script requires the backend virtual environment to be activated:
|
||||
|
||||
```bash
|
||||
# Windows
|
||||
.venv\Scripts\activate
|
||||
|
||||
# Linux/Mac
|
||||
source .venv/bin/activate
|
||||
```
|
||||
64
backend/scripts/create_admin.py
Normal file
64
backend/scripts/create_admin.py
Normal file
@@ -0,0 +1,64 @@
|
||||
"""
|
||||
Script to create an admin user account.
|
||||
Usage: python backend/scripts/create_admin.py
|
||||
"""
|
||||
import sys
|
||||
import os
|
||||
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
||||
|
||||
from db.db import users_db
|
||||
from models.user import User
|
||||
from werkzeug.security import generate_password_hash
|
||||
from tinydb import Query
|
||||
import uuid
|
||||
|
||||
def create_admin_user(email: str, password: str, first_name: str, last_name: str):
|
||||
"""Create an admin user account."""
|
||||
|
||||
# Check if user already exists
|
||||
Query_ = Query()
|
||||
existing_user = users_db.get(Query_.email == email)
|
||||
|
||||
if existing_user:
|
||||
print(f"Error: User with email {email} already exists")
|
||||
return False
|
||||
|
||||
admin = User(
|
||||
id=str(uuid.uuid4()),
|
||||
email=email,
|
||||
first_name=first_name,
|
||||
last_name=last_name,
|
||||
password=generate_password_hash(password),
|
||||
verified=True,
|
||||
role='admin'
|
||||
)
|
||||
|
||||
users_db.insert(admin.to_dict())
|
||||
print(f"✓ Admin user created successfully!")
|
||||
print(f" Email: {email}")
|
||||
print(f" Name: {first_name} {last_name}")
|
||||
print(f" Role: admin")
|
||||
return True
|
||||
|
||||
if __name__ == '__main__':
|
||||
print("=== Create Admin User ===\n")
|
||||
|
||||
email = input("Email: ").strip()
|
||||
password = input("Password: ").strip()
|
||||
first_name = input("First name: ").strip()
|
||||
last_name = input("Last name: ").strip()
|
||||
|
||||
if not all([email, password, first_name, last_name]):
|
||||
print("Error: All fields are required")
|
||||
sys.exit(1)
|
||||
|
||||
if len(password) < 8:
|
||||
print("Error: Password must be at least 8 characters")
|
||||
sys.exit(1)
|
||||
|
||||
confirm = input(f"\nCreate admin user '{email}'? (yes/no): ").strip().lower()
|
||||
|
||||
if confirm == 'yes':
|
||||
create_admin_user(email, password, first_name, last_name)
|
||||
else:
|
||||
print("Cancelled")
|
||||
@@ -39,7 +39,8 @@ def admin_user():
|
||||
marked_for_deletion=False,
|
||||
marked_for_deletion_at=None,
|
||||
deletion_in_progress=False,
|
||||
deletion_attempted_at=None
|
||||
deletion_attempted_at=None,
|
||||
role='admin'
|
||||
)
|
||||
users_db.insert(user.to_dict())
|
||||
|
||||
@@ -63,7 +64,8 @@ def setup_deletion_queue():
|
||||
marked_for_deletion=False,
|
||||
marked_for_deletion_at=None,
|
||||
deletion_in_progress=False,
|
||||
deletion_attempted_at=None
|
||||
deletion_attempted_at=None,
|
||||
role='admin'
|
||||
)
|
||||
users_db.insert(admin.to_dict())
|
||||
|
||||
@@ -185,7 +187,8 @@ class TestGetDeletionQueue:
|
||||
marked_for_deletion=False,
|
||||
marked_for_deletion_at=None,
|
||||
deletion_in_progress=False,
|
||||
deletion_attempted_at=None
|
||||
deletion_attempted_at=None,
|
||||
role='admin'
|
||||
)
|
||||
users_db.insert(admin.to_dict())
|
||||
|
||||
@@ -340,7 +343,8 @@ class TestTriggerDeletionQueue:
|
||||
marked_for_deletion=False,
|
||||
marked_for_deletion_at=None,
|
||||
deletion_in_progress=False,
|
||||
deletion_attempted_at=None
|
||||
deletion_attempted_at=None,
|
||||
role='admin'
|
||||
)
|
||||
users_db.insert(admin.to_dict())
|
||||
|
||||
@@ -353,14 +357,74 @@ class TestTriggerDeletionQueue:
|
||||
|
||||
|
||||
class TestAdminRoleValidation:
|
||||
"""Tests for admin role validation (placeholder for future implementation)."""
|
||||
"""Tests for admin role validation."""
|
||||
|
||||
def test_non_admin_user_access(self, client):
|
||||
"""
|
||||
Test that non-admin users cannot access admin endpoints.
|
||||
"""
|
||||
users_db.truncate()
|
||||
|
||||
NOTE: This test will need to be updated once admin role validation
|
||||
is implemented. Currently, all authenticated users can access admin endpoints.
|
||||
# Create non-admin user (role='user')
|
||||
user = User(
|
||||
id='regular_user',
|
||||
email='user@example.com',
|
||||
first_name='Test',
|
||||
last_name='User',
|
||||
password='hash',
|
||||
marked_for_deletion=False,
|
||||
marked_for_deletion_at=None,
|
||||
deletion_in_progress=False,
|
||||
deletion_attempted_at=None,
|
||||
role='user'
|
||||
)
|
||||
users_db.insert(user.to_dict())
|
||||
|
||||
# Create token for non-admin
|
||||
token = jwt.encode({'user_id': 'regular_user'}, 'supersecretkey', algorithm='HS256')
|
||||
|
||||
client.set_cookie('token', token)
|
||||
response = client.get('/admin/deletion-queue')
|
||||
|
||||
# Should return 403 Forbidden
|
||||
assert response.status_code == 403
|
||||
data = response.get_json()
|
||||
assert data['code'] == 'ADMIN_REQUIRED'
|
||||
assert 'Admin access required' in data['error']
|
||||
|
||||
def test_admin_user_access(self, client):
|
||||
"""
|
||||
Test that admin users can access admin endpoints.
|
||||
"""
|
||||
users_db.truncate()
|
||||
|
||||
# Create admin user (role='admin')
|
||||
admin = User(
|
||||
id='admin_user',
|
||||
email='admin@example.com',
|
||||
first_name='Admin',
|
||||
last_name='User',
|
||||
password='hash',
|
||||
marked_for_deletion=False,
|
||||
marked_for_deletion_at=None,
|
||||
deletion_in_progress=False,
|
||||
deletion_attempted_at=None,
|
||||
role='admin'
|
||||
)
|
||||
users_db.insert(admin.to_dict())
|
||||
|
||||
# Create token for admin
|
||||
token = jwt.encode({'user_id': 'admin_user'}, 'supersecretkey', algorithm='HS256')
|
||||
|
||||
client.set_cookie('token', token)
|
||||
response = client.get('/admin/deletion-queue')
|
||||
|
||||
# Should succeed
|
||||
assert response.status_code == 200
|
||||
|
||||
def test_update_threshold_requires_admin(self, client):
|
||||
"""
|
||||
Test that updating deletion threshold requires admin role.
|
||||
"""
|
||||
users_db.truncate()
|
||||
|
||||
@@ -369,26 +433,17 @@ class TestAdminRoleValidation:
|
||||
id='regular_user',
|
||||
email='user@example.com',
|
||||
first_name='Test',
|
||||
last_name='User',
|
||||
password='hash',
|
||||
marked_for_deletion=False,
|
||||
marked_for_deletion_at=None,
|
||||
deletion_in_progress=False,
|
||||
deletion_attempted_at=None
|
||||
last_name='User',
|
||||
password='hash',
|
||||
role='user'
|
||||
)
|
||||
users_db.insert(user.to_dict())
|
||||
|
||||
# Create token for non-admin
|
||||
token = jwt.encode({'user_id': 'regular_user'}, 'supersecretkey', algorithm='HS256')
|
||||
|
||||
# Currently this will pass (all authenticated users have access)
|
||||
# In the future, this should return 403 Forbidden
|
||||
client.set_cookie('token', token)
|
||||
response = client.get('/admin/deletion-queue')
|
||||
response = client.put('/admin/deletion-threshold', json={'threshold_hours': 168})
|
||||
|
||||
# TODO: Change to 403 once admin role validation is implemented
|
||||
assert response.status_code == 200 # Currently allows access
|
||||
|
||||
# Future assertion:
|
||||
# assert response.status_code == 403
|
||||
# assert response.get_json()['code'] == 'FORBIDDEN'
|
||||
assert response.status_code == 403
|
||||
data = response.get_json()
|
||||
assert data['code'] == 'ADMIN_REQUIRED'
|
||||
|
||||
@@ -18,6 +18,7 @@ export interface User {
|
||||
marked_for_deletion_at: string | null
|
||||
deletion_in_progress: boolean
|
||||
deletion_attempted_at: string | null
|
||||
role: string
|
||||
}
|
||||
|
||||
export interface Child {
|
||||
|
||||
Reference in New Issue
Block a user