Files
MultiChatOverlay/main.py

211 lines
8.6 KiB
Python

import os
import logging
import asyncio
from fastapi import FastAPI, Request, Depends, HTTPException
from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware
from starlette.middleware.sessions import SessionMiddleware
from starlette.staticfiles import StaticFiles
from starlette.responses import FileResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from starlette.websockets import WebSocket
from contextlib import asynccontextmanager
from sqlalchemy.orm import Session
import models
from database import engine, SessionLocal
import auth # Import the new auth module
import schemas
from starlette.responses import Response
from config import settings # Import settings to get the secret key
from listener_manager import ListenerManager
from websocket_manager import WebSocketManager
# --- Absolute Path Configuration ---
# Get the absolute path of the directory where this file is located
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
STATIC_DIR = os.path.join(BASE_DIR, "static")
TEMPLATES_DIR = os.path.join(BASE_DIR, "templates")
# --- Logging Configuration ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def background_listener_startup(app: FastAPI):
"""A non-blocking task to start listeners after the app has started."""
logger.info("Background task: Starting listeners for all users...")
db = SessionLocal()
users = db.query(models.User).all()
db.close()
for user in users:
# Use try/except to ensure one failing listener doesn't stop others
try:
await app.state.listener_manager.start_listener_for_user(user, app.state.websocket_manager)
except Exception as e:
logger.error(f"Failed to start listener for user {user.id} ({user.username}): {e}")
@asynccontextmanager
async def lifespan(app: FastAPI):
# This code runs on startup
logger.info("Application startup: Creating database tables...")
app.state.websocket_manager = WebSocketManager()
app.state.listener_manager = ListenerManager()
models.Base.metadata.create_all(bind=engine)
logger.info("Application startup: Database tables created.")
# Decouple listener startup from the main application startup
asyncio.create_task(background_listener_startup(app))
yield
# This code runs on shutdown
logger.info("Application shutdown: Stopping all listeners...")
manager = app.state.listener_manager
# Create a copy of keys to avoid runtime errors from changing dict size
for user_id in list(manager.active_listeners.keys()):
await manager.stop_listener_for_user(user_id)
app = FastAPI(lifespan=lifespan)
# Add middleware to trust proxy headers (X-Forwarded-For, X-Forwarded-Proto)
# This is crucial for running behind a reverse proxy like Nginx or Caddy.
app.add_middleware(ProxyHeadersMiddleware, trusted_hosts="*")
# Add session middleware. A secret key is required for signing the session cookie.
# We can reuse our encryption key for this, but in production you might want a separate key.
app.add_middleware(SessionMiddleware, secret_key=settings.ENCRYPTION_KEY)
# Mount the 'static' directory using an absolute path for reliability
# This MUST be done before the routes that depend on it are defined.
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")
# Add the authentication router
app.include_router(auth.router)
# --- Template Dependency ---
def get_templates():
return Jinja2Templates(directory=TEMPLATES_DIR)
@app.get("/")
async def read_root(request: Request, templates: Jinja2Templates = Depends(get_templates)):
return templates.TemplateResponse("login.html", {"request": request})
@app.get("/dashboard")
async def read_dashboard(request: Request, db: Session = Depends(auth.get_db),
templates: Jinja2Templates = Depends(get_templates)):
# This is our protected route. It checks if a user_id exists in the session.
user_id = request.session.get('user_id')
if not user_id:
# If not, redirect them to the login page.
return RedirectResponse(url="/")
user = db.query(models.User).filter(models.User.id == user_id).first()
overlay_url = f"{settings.APP_BASE_URL}/overlay/{user.id}"
# Ensure user has settings, create if they don't for some reason
if not user.settings:
user.settings = models.Setting()
db.commit()
return templates.TemplateResponse("dashboard.html", {
"request": request,
"user": user,
"overlay_url": overlay_url,
"current_theme": user.settings.overlay_theme,
"settings": settings,
"custom_themes": user.custom_themes
})
@app.get("/logout")
async def logout(request: Request):
# Clear the session cookie
request.session.clear()
return RedirectResponse(url="/")
@app.get("/help/css")
async def css_help(request: Request, templates: Jinja2Templates = Depends(get_templates)):
return templates.TemplateResponse("help_css.html", {"request": request})
@app.get("/overlay/{user_id}")
async def read_overlay(request: Request, user_id: int, theme_override: str = None,
db: Session = Depends(auth.get_db), templates: Jinja2Templates = Depends(get_templates)):
# This endpoint serves the overlay page.
user = db.query(models.User).filter(models.User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
theme_name = "dark-purple" # Default theme
if theme_override:
theme_name = theme_override
elif user.settings and user.settings.overlay_theme:
theme_name = user.settings.overlay_theme
# Check if it's a custom theme
if theme_name.startswith("custom-"):
theme_id = int(theme_name.split("-")[1])
theme = db.query(models.CustomTheme).filter(models.CustomTheme.id == theme_id, models.CustomTheme.owner_id == user.id).first()
if not theme:
raise HTTPException(status_code=404, detail="Custom theme not found")
# Use a generic overlay template that will link to the dynamic CSS
return templates.TemplateResponse("overlay-custom.html", {"request": request, "theme_id": theme.id})
return templates.TemplateResponse(f"overlay-{theme_name}.html", {"request": request})
@app.post("/api/settings")
async def update_settings(settings_data: schemas.SettingsUpdate, request: Request, db: Session = Depends(auth.get_db)):
user_id = request.session.get('user_id')
if not user_id:
raise HTTPException(status_code=401, detail="Not authenticated")
user = db.query(models.User).filter(models.User.id == user_id).first()
if not user.settings:
user.settings = models.Setting()
user.settings.overlay_theme = settings_data.overlay_theme
db.commit()
return {"message": "Settings updated successfully"}
@app.post("/api/themes", response_model=schemas.CustomTheme)
async def create_theme(theme_data: schemas.CustomThemeCreate, request: Request, db: Session = Depends(auth.get_db)):
user_id = request.session.get('user_id')
if not user_id:
raise HTTPException(status_code=401, detail="Not authenticated")
new_theme = models.CustomTheme(**theme_data.dict(), owner_id=user_id)
db.add(new_theme)
db.commit()
db.refresh(new_theme)
return new_theme
@app.delete("/api/themes/{theme_id}")
async def delete_theme(theme_id: int, request: Request, db: Session = Depends(auth.get_db)):
user_id = request.session.get('user_id')
if not user_id:
raise HTTPException(status_code=401, detail="Not authenticated")
theme = db.query(models.CustomTheme).filter(models.CustomTheme.id == theme_id, models.CustomTheme.owner_id == user_id).first()
if not theme:
raise HTTPException(status_code=404, detail="Theme not found")
db.delete(theme)
db.commit()
return {"message": "Theme deleted successfully"}
@app.get("/css/custom/{theme_id}")
async def get_custom_css(theme_id: int, db: Session = Depends(auth.get_db)):
theme = db.query(models.CustomTheme).filter(models.CustomTheme.id == theme_id).first()
if not theme:
raise HTTPException(status_code=404, detail="Custom theme not found")
return Response(content=theme.css_content, media_type="text/css")
@app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: int):
manager = websocket.app.state.websocket_manager
await manager.connect(user_id, websocket)
try:
while True:
# Keep the connection alive
await websocket.receive_text()
except Exception:
manager.disconnect(user_id, websocket)
logger.info(f"WebSocket for user {user_id} disconnected.")