Revert "attempting proxying"
This reverts commit 7d23bf60cf.
changed to 1 worker
This commit is contained in:
@@ -13,4 +13,4 @@ ENV PYTHONUNBUFFERED=1
|
|||||||
ENV PYTHONIOENCODING=utf-8
|
ENV PYTHONIOENCODING=utf-8
|
||||||
|
|
||||||
# Use Gunicorn instead of python main.py
|
# Use Gunicorn instead of python main.py
|
||||||
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "--workers", "4", "--timeout", "120", "--access-logfile", "-", "--error-logfile", "-", "--log-level", "info", "main:app"]
|
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "-k", "gevent", "--workers", "1", "--timeout", "120", "--access-logfile", "-", "--error-logfile", "-", "--log-level", "info", "main:app"]
|
||||||
@@ -3,7 +3,6 @@ import queue
|
|||||||
import uuid
|
import uuid
|
||||||
from typing import Dict, Any
|
from typing import Dict, Any
|
||||||
import logging
|
import logging
|
||||||
from multiprocessing import Manager
|
|
||||||
|
|
||||||
from flask import Response
|
from flask import Response
|
||||||
|
|
||||||
@@ -11,32 +10,38 @@ from events.types.event import Event
|
|||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
manager = Manager()
|
|
||||||
# Maps user_id → dict of {connection_id: queue}
|
# Maps user_id → dict of {connection_id: queue}
|
||||||
user_queues = manager.dict() # Shared across all Gunicorn workers
|
user_queues: Dict[str, Dict[str, queue.Queue]] = {}
|
||||||
logging.basicConfig(level=logging.INFO)
|
logging.basicConfig(level=logging.INFO)
|
||||||
|
|
||||||
|
|
||||||
def get_queue(user_id: str, connection_id: str) -> queue.Queue:
|
def get_queue(user_id: str, connection_id: str) -> queue.Queue:
|
||||||
"""Get or create a queue for a specific user connection."""
|
"""Get or create a queue for a specific user connection."""
|
||||||
if user_id not in user_queues:
|
if user_id not in user_queues:
|
||||||
user_queues[user_id] = manager.dict()
|
user_queues[user_id] = {}
|
||||||
|
|
||||||
if connection_id not in user_queues[user_id]:
|
if connection_id not in user_queues[user_id]:
|
||||||
user_queues[user_id][connection_id] = manager.Queue()
|
user_queues[user_id][connection_id] = queue.Queue()
|
||||||
|
|
||||||
return user_queues[user_id][connection_id]
|
return user_queues[user_id][connection_id]
|
||||||
|
|
||||||
|
|
||||||
def send_to_user(user_id: str, data: Dict[str, Any]):
|
def send_to_user(user_id: str, data: Dict[str, Any]):
|
||||||
"""Send data to all connections for a specific user."""
|
"""Send data to all connections for a specific user."""
|
||||||
logger.info(f"Sending data to {user_id}")
|
logger.info(f"Sending data to {user_id} user quesues are {user_queues.keys()}")
|
||||||
if user_id in user_queues:
|
if user_id in user_queues:
|
||||||
|
logger.info(f"Queued {user_id}")
|
||||||
|
# Format as SSE message once
|
||||||
message = f"data: {json.dumps(data)}\n\n".encode('utf-8')
|
message = f"data: {json.dumps(data)}\n\n".encode('utf-8')
|
||||||
|
|
||||||
|
# Send to all connections for this user
|
||||||
for connection_id, q in user_queues[user_id].items():
|
for connection_id, q in user_queues[user_id].items():
|
||||||
try:
|
try:
|
||||||
|
logger.info(f"Sending message to {connection_id}")
|
||||||
|
q.put(message)
|
||||||
q.put(message, block=False)
|
q.put(message, block=False)
|
||||||
except queue.Full:
|
except queue.Full:
|
||||||
|
# Skip if queue is full (connection might be dead)
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
@@ -58,7 +63,6 @@ def sse_response_for_user(user_id: str):
|
|||||||
# Get message from queue (blocks until available)
|
# Get message from queue (blocks until available)
|
||||||
logger.info(f"blocking on get for {user_id} user")
|
logger.info(f"blocking on get for {user_id} user")
|
||||||
message = user_queue.get()
|
message = user_queue.get()
|
||||||
logger.info(f"-------received message {message}")
|
|
||||||
yield message
|
yield message
|
||||||
except GeneratorExit:
|
except GeneratorExit:
|
||||||
# Clean up when client disconnects
|
# Clean up when client disconnects
|
||||||
@@ -78,9 +82,3 @@ def sse_response_for_user(user_id: str):
|
|||||||
'Connection': 'keep-alive'
|
'Connection': 'keep-alive'
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
_shared_manager = None
|
|
||||||
|
|
||||||
def set_shared_manager(manager):
|
|
||||||
global _shared_manager
|
|
||||||
_shared_manager = manager
|
|
||||||
|
|||||||
7
main.py
7
main.py
@@ -1,5 +1,4 @@
|
|||||||
import sys, logging
|
import sys, logging
|
||||||
from multiprocessing import Manager
|
|
||||||
|
|
||||||
from flask import Flask, request
|
from flask import Flask, request
|
||||||
from flask_cors import CORS
|
from flask_cors import CORS
|
||||||
@@ -9,7 +8,7 @@ from api.image_api import image_api
|
|||||||
from api.reward_api import reward_api
|
from api.reward_api import reward_api
|
||||||
from api.task_api import task_api
|
from api.task_api import task_api
|
||||||
from events.broadcaster import Broadcaster
|
from events.broadcaster import Broadcaster
|
||||||
from events.sse import sse_response_for_user, send_to_user, set_shared_manager
|
from events.sse import sse_response_for_user, send_to_user
|
||||||
from db.default import initializeImages
|
from db.default import initializeImages
|
||||||
|
|
||||||
# Configure logging once at application startup
|
# Configure logging once at application startup
|
||||||
@@ -22,10 +21,6 @@ logging.basicConfig(
|
|||||||
)
|
)
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
# Create shared manager
|
|
||||||
manager = Manager()
|
|
||||||
set_shared_manager(manager)
|
|
||||||
|
|
||||||
app = Flask(__name__)
|
app = Flask(__name__)
|
||||||
#CORS(app, resources={r"/api/*": {"origins": ["http://localhost:3000", "http://localhost:5173"]}})
|
#CORS(app, resources={r"/api/*": {"origins": ["http://localhost:3000", "http://localhost:5173"]}})
|
||||||
app.register_blueprint(child_api)
|
app.register_blueprint(child_api)
|
||||||
|
|||||||
Reference in New Issue
Block a user