import os import asyncio from fastapi import FastAPI, Request, Depends, HTTPException from starlette.middleware.sessions import SessionMiddleware from starlette.staticfiles import StaticFiles from starlette.responses import FileResponse, RedirectResponse from fastapi.templating import Jinja2Templates from starlette.websockets import WebSocket from contextlib import asynccontextmanager from sqlalchemy.orm import Session import models from database import engine, SessionLocal import auth # Import the new auth module import schemas from starlette.responses import Response from config import settings # Import settings to get the secret key from listener_manager import ListenerManager from websocket_manager import WebSocketManager # --- Absolute Path Configuration --- # Get the absolute path of the directory where this file is located BASE_DIR = os.path.dirname(os.path.abspath(__file__)) STATIC_DIR = os.path.join(BASE_DIR, "static") TEMPLATES_DIR = os.path.join(BASE_DIR, "templates") async def background_listener_startup(app: FastAPI): """A non-blocking task to start listeners after the app has started.""" print("Background task: Starting listeners for all users...") db = SessionLocal() users = db.query(models.User).all() db.close() for user in users: # Use try/except to ensure one failing listener doesn't stop others try: await app.state.listener_manager.start_listener_for_user(user, app.state.websocket_manager) except Exception as e: print(f"ERROR: Failed to start listener for user {user.id} ({user.username}): {e}") @asynccontextmanager async def lifespan(app: FastAPI): # This code runs on startup print("Application startup: Creating database tables...") app.state.websocket_manager = WebSocketManager() app.state.listener_manager = ListenerManager() models.Base.metadata.create_all(bind=engine) print("Application startup: Database tables created.") # Decouple listener startup from the main application startup asyncio.create_task(background_listener_startup(app)) yield # This code runs on shutdown print("Application shutdown: Stopping all listeners...") manager = app.state.listener_manager # Create a copy of keys to avoid runtime errors from changing dict size for user_id in list(manager.active_listeners.keys()): await manager.stop_listener_for_user(user_id) app = FastAPI(lifespan=lifespan) # Configure Jinja2 templates templates = Jinja2Templates(directory=TEMPLATES_DIR) # Add session middleware. A secret key is required for signing the session cookie. # We can reuse our encryption key for this, but in production you might want a separate key. app.add_middleware(SessionMiddleware, secret_key=settings.ENCRYPTION_KEY) # Mount the 'static' directory using an absolute path for reliability # This MUST be done before the routes that depend on it are defined. app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") # Add the authentication router app.include_router(auth.router) @app.get("/") async def read_root(request: Request): return templates.TemplateResponse("login.html", {"request": request}) @app.get("/dashboard") async def read_dashboard(request: Request, db: Session = Depends(auth.get_db)): # This is our protected route. It checks if a user_id exists in the session. user_id = request.session.get('user_id') if not user_id: # If not, redirect them to the login page. return RedirectResponse(url="/") user = db.query(models.User).filter(models.User.id == user_id).first() overlay_url = f"{settings.APP_BASE_URL}/overlay/{user.id}" # Ensure user has settings, create if they don't for some reason if not user.settings: user.settings = models.Setting() db.commit() return templates.TemplateResponse("dashboard.html", { "request": request, "user": user, "overlay_url": overlay_url, "current_theme": user.settings.overlay_theme, "settings": settings, "custom_themes": user.custom_themes }) @app.get("/logout") async def logout(request: Request): # Clear the session cookie request.session.clear() return RedirectResponse(url="/") @app.get("/help/css") async def css_help(request: Request): return templates.TemplateResponse("help_css.html", {"request": request}) @app.get("/overlay/{user_id}") async def read_overlay(request: Request, user_id: int, theme_override: str = None, db: Session = Depends(auth.get_db)): # This endpoint serves the overlay page. user = db.query(models.User).filter(models.User.id == user_id).first() if not user: raise HTTPException(status_code=404, detail="User not found") theme_name = "dark-purple" # Default theme if theme_override: theme_name = theme_override elif user.settings and user.settings.overlay_theme: theme_name = user.settings.overlay_theme # Check if it's a custom theme if theme_name.startswith("custom-"): theme_id = int(theme_name.split("-")[1]) theme = db.query(models.CustomTheme).filter(models.CustomTheme.id == theme_id, models.CustomTheme.owner_id == user.id).first() if not theme: raise HTTPException(status_code=404, detail="Custom theme not found") # Use a generic overlay template that will link to the dynamic CSS return templates.TemplateResponse("overlay-custom.html", {"request": request, "theme_id": theme.id}) return templates.TemplateResponse(f"overlay-{theme_name}.html", {"request": request}) @app.post("/api/settings") async def update_settings(settings_data: schemas.SettingsUpdate, request: Request, db: Session = Depends(auth.get_db)): user_id = request.session.get('user_id') if not user_id: raise HTTPException(status_code=401, detail="Not authenticated") user = db.query(models.User).filter(models.User.id == user_id).first() if not user.settings: user.settings = models.Setting() user.settings.overlay_theme = settings_data.overlay_theme db.commit() return {"message": "Settings updated successfully"} @app.post("/api/themes", response_model=schemas.CustomTheme) async def create_theme(theme_data: schemas.CustomThemeCreate, request: Request, db: Session = Depends(auth.get_db)): user_id = request.session.get('user_id') if not user_id: raise HTTPException(status_code=401, detail="Not authenticated") new_theme = models.CustomTheme(**theme_data.dict(), owner_id=user_id) db.add(new_theme) db.commit() db.refresh(new_theme) return new_theme @app.delete("/api/themes/{theme_id}") async def delete_theme(theme_id: int, request: Request, db: Session = Depends(auth.get_db)): user_id = request.session.get('user_id') if not user_id: raise HTTPException(status_code=401, detail="Not authenticated") theme = db.query(models.CustomTheme).filter(models.CustomTheme.id == theme_id, models.CustomTheme.owner_id == user_id).first() if not theme: raise HTTPException(status_code=404, detail="Theme not found") db.delete(theme) db.commit() return {"message": "Theme deleted successfully"} @app.get("/css/custom/{theme_id}") async def get_custom_css(theme_id: int, db: Session = Depends(auth.get_db)): theme = db.query(models.CustomTheme).filter(models.CustomTheme.id == theme_id).first() if not theme: raise HTTPException(status_code=404, detail="Custom theme not found") return Response(content=theme.css_content, media_type="text/css") @app.websocket("/ws/{user_id}") async def websocket_endpoint(websocket: WebSocket, user_id: int): manager = websocket.app.state.websocket_manager await manager.connect(user_id, websocket) try: while True: # Keep the connection alive await websocket.receive_text() except Exception: manager.disconnect(user_id, websocket) print(f"WebSocket for user {user_id} disconnected.")