Files
chore/events/sse.py

87 lines
2.6 KiB
Python

import json
import queue
import uuid
from typing import Dict, Any
import logging
from multiprocessing import Manager
from flask import Response
from events.types.event import Event
logger = logging.getLogger(__name__)
manager = Manager()
# Maps user_id → dict of {connection_id: queue}
user_queues = manager.dict() # Shared across all Gunicorn workers
logging.basicConfig(level=logging.INFO)
def get_queue(user_id: str, connection_id: str) -> queue.Queue:
"""Get or create a queue for a specific user connection."""
if user_id not in user_queues:
user_queues[user_id] = manager.dict()
if connection_id not in user_queues[user_id]:
user_queues[user_id][connection_id] = manager.Queue()
return user_queues[user_id][connection_id]
def send_to_user(user_id: str, data: Dict[str, Any]):
"""Send data to all connections for a specific user."""
logger.info(f"Sending data to {user_id}")
if user_id in user_queues:
message = f"data: {json.dumps(data)}\n\n".encode('utf-8')
for connection_id, q in user_queues[user_id].items():
try:
q.put(message, block=False)
except queue.Full:
pass
def send_event_to_user(user_id: str, event: Event):
"""Send an Event to all connections for a specific user."""
send_to_user(user_id, event.to_dict())
def sse_response_for_user(user_id: str):
"""Create SSE response for a user connection."""
# Generate unique connection ID
connection_id = str(uuid.uuid4())
user_queue = get_queue(user_id, connection_id)
logger.info(f"SSE response for {user_id} user is {user_queues.keys()}")
def generate():
try:
while True:
# Get message from queue (blocks until available)
logger.info(f"blocking on get for {user_id} user")
message = user_queue.get()
logger.info(f"-------received message {message}")
yield message
except GeneratorExit:
# Clean up when client disconnects
if user_id in user_queues and connection_id in user_queues[user_id]:
del user_queues[user_id][connection_id]
# Remove user entry if no connections remain
if not user_queues[user_id]:
del user_queues[user_id]
return Response(
generate(),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'X-Accel-Buffering': 'no',
'Connection': 'keep-alive'
}
)
_shared_manager = None
def set_shared_manager(manager):
global _shared_manager
_shared_manager = manager