85 lines
2.7 KiB
Python
85 lines
2.7 KiB
Python
import json
|
|
import queue
|
|
import uuid
|
|
from typing import Dict, Any
|
|
import logging
|
|
|
|
from flask import Response
|
|
|
|
from events.types.event import Event
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Maps user_id → dict of {connection_id: queue}
|
|
user_queues: Dict[str, Dict[str, queue.Queue]] = {}
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
|
|
def get_queue(user_id: str, connection_id: str) -> queue.Queue:
|
|
"""Get or create a queue for a specific user connection."""
|
|
if user_id not in user_queues:
|
|
user_queues[user_id] = {}
|
|
|
|
if connection_id not in user_queues[user_id]:
|
|
user_queues[user_id][connection_id] = queue.Queue()
|
|
|
|
return user_queues[user_id][connection_id]
|
|
|
|
|
|
def send_to_user(user_id: str, data: Dict[str, Any]):
|
|
"""Send data to all connections for a specific user."""
|
|
logger.info(f"Sending data to {user_id} user quesues are {user_queues.keys()}")
|
|
if user_id in user_queues:
|
|
logger.info(f"Queued {user_id}")
|
|
# Format as SSE message once
|
|
message = f"data: {json.dumps(data)}\n\n".encode('utf-8')
|
|
|
|
# Send to all connections for this user
|
|
for connection_id, q in user_queues[user_id].items():
|
|
try:
|
|
logger.info(f"Sending message to {connection_id}")
|
|
q.put(message)
|
|
q.put(message, block=False)
|
|
except queue.Full:
|
|
# Skip if queue is full (connection might be dead)
|
|
pass
|
|
|
|
|
|
def send_event_to_user(user_id: str, event: Event):
|
|
"""Send an Event to all connections for a specific user."""
|
|
send_to_user(user_id, event.to_dict())
|
|
|
|
|
|
def sse_response_for_user(user_id: str):
|
|
"""Create SSE response for a user connection."""
|
|
# Generate unique connection ID
|
|
connection_id = str(uuid.uuid4())
|
|
user_queue = get_queue(user_id, connection_id)
|
|
logger.info(f"SSE response for {user_id} user is {user_queues.keys()}")
|
|
|
|
def generate():
|
|
try:
|
|
while True:
|
|
# Get message from queue (blocks until available)
|
|
logger.info(f"blocking on get for {user_id} user")
|
|
message = user_queue.get()
|
|
yield message
|
|
except GeneratorExit:
|
|
# Clean up when client disconnects
|
|
if user_id in user_queues and connection_id in user_queues[user_id]:
|
|
del user_queues[user_id][connection_id]
|
|
|
|
# Remove user entry if no connections remain
|
|
if not user_queues[user_id]:
|
|
del user_queues[user_id]
|
|
|
|
return Response(
|
|
generate(),
|
|
mimetype='text/event-stream',
|
|
headers={
|
|
'Cache-Control': 'no-cache',
|
|
'X-Accel-Buffering': 'no',
|
|
'Connection': 'keep-alive'
|
|
}
|
|
)
|