Files
MultiChatOverlay/main.py

89 lines
3.0 KiB
Python

import asyncio
import json
from fastapi import FastAPI, WebSocket, Request, Depends
from fastapi.responses import HTMLResponse, RedirectResponse
from starlette.websockets import WebSocketDisconnect
from starlette.middleware.base import BaseHTTPMiddleware
from sqlalchemy.orm import Session
from chat_listeners import listen_youtube_chat, listen_twitch_chat
from auth import router as auth_router, serializer
from database import get_db, User
app = FastAPI()
class SessionMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
response = await call_next(request)
session_cookie = request.cookies.get("session")
if session_cookie:
try:
data = serializer.loads(session_cookie, max_age=3600 * 24 * 7) # 1 week
db = next(get_db())
user = db.query(User).filter(User.id == data["user_id"]).first()
request.state.user = user
except Exception:
request.state.user = None
else:
request.state.user = None
return response
app.add_middleware(SessionMiddleware)
def get_current_user(request: Request):
return request.state.user
app.include_router(auth_router, prefix="/auth") # Include the auth router
connected_clients = []
async def broadcast_message(message: dict):
# Convert the message dictionary to a JSON string before sending
message_json = json.dumps(message)
for client in connected_clients:
try:
await client.send_text(message_json)
except RuntimeError:
# Handle cases where client might have disconnected
connected_clients.remove(client)
@app.on_event("startup")
async def startup_event():
# Start chat listeners in the background
# Replace with actual video ID and Twitch token/channel
# For now, using placeholders. These will need to be configured.
asyncio.create_task(listen_youtube_chat("YOUR_YOUTUBE_VIDEO_ID", broadcast_message))
asyncio.create_task(listen_twitch_chat("YOUR_TWITCH_OAUTH_TOKEN", "YOUR_TWITCH_CHANNEL", broadcast_message))
@app.get("/")
async def read_root():
return {"Hello": "World"}
@app.get("/login", response_class=HTMLResponse)
async def get_login_page():
with open("login.html", "r") as f:
return f.read()
@app.get("/dashboard", response_class=HTMLResponse)
async def get_dashboard(user: User = Depends(get_current_user)):
if not user:
return RedirectResponse(url="/login")
with open("dashboard.html", "r") as f:
return f.read()
@app.get("/overlay", response_class=HTMLResponse)
async def get_overlay():
with open("index.html", "r") as f:
return f.read()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
connected_clients.append(websocket)
try:
while True:
# Keep the connection alive, or handle incoming messages if needed
await websocket.receive_text()
except WebSocketDisconnect:
connected_clients.remove(websocket)