From 3e89f8fae6a049207c67c1aac9bb6b40a38a8ff0 Mon Sep 17 00:00:00 2001 From: Ryan Kegel Date: Tue, 9 Dec 2025 16:26:49 -0500 Subject: [PATCH] attempting proxying using shared manager --- events/sse.py | 21 +++++++++++---------- main.py | 7 ++++++- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/events/sse.py b/events/sse.py index d679110..3e0da8a 100644 --- a/events/sse.py +++ b/events/sse.py @@ -3,6 +3,7 @@ import queue import uuid from typing import Dict, Any import logging +from multiprocessing import Manager from flask import Response @@ -10,6 +11,7 @@ from events.types.event import Event logger = logging.getLogger(__name__) +manager = Manager() # Maps user_id → dict of {connection_id: queue} user_queues: Dict[str, Dict[str, queue.Queue]] = {} logging.basicConfig(level=logging.INFO) @@ -18,30 +20,23 @@ logging.basicConfig(level=logging.INFO) def get_queue(user_id: str, connection_id: str) -> queue.Queue: """Get or create a queue for a specific user connection.""" if user_id not in user_queues: - user_queues[user_id] = {} + user_queues[user_id] = manager.dict() if connection_id not in user_queues[user_id]: - user_queues[user_id][connection_id] = queue.Queue() + user_queues[user_id][connection_id] = manager.Queue() return user_queues[user_id][connection_id] def send_to_user(user_id: str, data: Dict[str, Any]): """Send data to all connections for a specific user.""" - logger.info(f"Sending data to {user_id} user quesues are {user_queues.keys()}") + logger.info(f"Sending data to {user_id}") if user_id in user_queues: - logger.info(f"Queued {user_id}") - # Format as SSE message once message = f"data: {json.dumps(data)}\n\n".encode('utf-8') - - # Send to all connections for this user for connection_id, q in user_queues[user_id].items(): try: - logger.info(f"Sending message to {connection_id}") - q.put(message) q.put(message, block=False) except queue.Full: - # Skip if queue is full (connection might be dead) pass @@ -82,3 +77,9 @@ def sse_response_for_user(user_id: str): 'Connection': 'keep-alive' } ) + +_shared_manager = None + +def set_shared_manager(manager): + global _shared_manager + _shared_manager = manager diff --git a/main.py b/main.py index 90efff0..f5e5a86 100644 --- a/main.py +++ b/main.py @@ -1,4 +1,5 @@ import sys, logging +from multiprocessing import Manager from flask import Flask, request from flask_cors import CORS @@ -8,7 +9,7 @@ from api.image_api import image_api from api.reward_api import reward_api from api.task_api import task_api from events.broadcaster import Broadcaster -from events.sse import sse_response_for_user, send_to_user +from events.sse import sse_response_for_user, send_to_user, set_shared_manager from db.default import initializeImages # Configure logging once at application startup @@ -21,6 +22,10 @@ logging.basicConfig( ) logger = logging.getLogger(__name__) +# Create shared manager +manager = Manager() +set_shared_manager(manager) + app = Flask(__name__) #CORS(app, resources={r"/api/*": {"origins": ["http://localhost:3000", "http://localhost:5173"]}}) app.register_blueprint(child_api)