attempting proxying using shared manager

This commit is contained in:
2025-12-09 16:26:49 -05:00
parent 7d23bf60cf
commit 3e89f8fae6
2 changed files with 17 additions and 11 deletions

View File

@@ -3,6 +3,7 @@ import queue
import uuid import uuid
from typing import Dict, Any from typing import Dict, Any
import logging import logging
from multiprocessing import Manager
from flask import Response from flask import Response
@@ -10,6 +11,7 @@ from events.types.event import Event
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
manager = Manager()
# Maps user_id → dict of {connection_id: queue} # Maps user_id → dict of {connection_id: queue}
user_queues: Dict[str, Dict[str, queue.Queue]] = {} user_queues: Dict[str, Dict[str, queue.Queue]] = {}
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
@@ -18,30 +20,23 @@ logging.basicConfig(level=logging.INFO)
def get_queue(user_id: str, connection_id: str) -> queue.Queue: def get_queue(user_id: str, connection_id: str) -> queue.Queue:
"""Get or create a queue for a specific user connection.""" """Get or create a queue for a specific user connection."""
if user_id not in user_queues: if user_id not in user_queues:
user_queues[user_id] = {} user_queues[user_id] = manager.dict()
if connection_id not in user_queues[user_id]: if connection_id not in user_queues[user_id]:
user_queues[user_id][connection_id] = queue.Queue() user_queues[user_id][connection_id] = manager.Queue()
return user_queues[user_id][connection_id] return user_queues[user_id][connection_id]
def send_to_user(user_id: str, data: Dict[str, Any]): def send_to_user(user_id: str, data: Dict[str, Any]):
"""Send data to all connections for a specific user.""" """Send data to all connections for a specific user."""
logger.info(f"Sending data to {user_id} user quesues are {user_queues.keys()}") logger.info(f"Sending data to {user_id}")
if user_id in user_queues: if user_id in user_queues:
logger.info(f"Queued {user_id}")
# Format as SSE message once
message = f"data: {json.dumps(data)}\n\n".encode('utf-8') message = f"data: {json.dumps(data)}\n\n".encode('utf-8')
# Send to all connections for this user
for connection_id, q in user_queues[user_id].items(): for connection_id, q in user_queues[user_id].items():
try: try:
logger.info(f"Sending message to {connection_id}")
q.put(message)
q.put(message, block=False) q.put(message, block=False)
except queue.Full: except queue.Full:
# Skip if queue is full (connection might be dead)
pass pass
@@ -82,3 +77,9 @@ def sse_response_for_user(user_id: str):
'Connection': 'keep-alive' 'Connection': 'keep-alive'
} }
) )
_shared_manager = None
def set_shared_manager(manager):
global _shared_manager
_shared_manager = manager

View File

@@ -1,4 +1,5 @@
import sys, logging import sys, logging
from multiprocessing import Manager
from flask import Flask, request from flask import Flask, request
from flask_cors import CORS from flask_cors import CORS
@@ -8,7 +9,7 @@ from api.image_api import image_api
from api.reward_api import reward_api from api.reward_api import reward_api
from api.task_api import task_api from api.task_api import task_api
from events.broadcaster import Broadcaster from events.broadcaster import Broadcaster
from events.sse import sse_response_for_user, send_to_user from events.sse import sse_response_for_user, send_to_user, set_shared_manager
from db.default import initializeImages from db.default import initializeImages
# Configure logging once at application startup # Configure logging once at application startup
@@ -21,6 +22,10 @@ logging.basicConfig(
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Create shared manager
manager = Manager()
set_shared_manager(manager)
app = Flask(__name__) app = Flask(__name__)
#CORS(app, resources={r"/api/*": {"origins": ["http://localhost:3000", "http://localhost:5173"]}}) #CORS(app, resources={r"/api/*": {"origins": ["http://localhost:3000", "http://localhost:5173"]}})
app.register_blueprint(child_api) app.register_blueprint(child_api)