201 lines
8.2 KiB
Python
201 lines
8.2 KiB
Python
import os
|
|
import asyncio
|
|
from fastapi import FastAPI, Request, Depends, HTTPException
|
|
from starlette.middleware.sessions import SessionMiddleware
|
|
from starlette.staticfiles import StaticFiles
|
|
from starlette.responses import FileResponse, RedirectResponse
|
|
from fastapi.templating import Jinja2Templates
|
|
from starlette.websockets import WebSocket
|
|
from contextlib import asynccontextmanager
|
|
from sqlalchemy.orm import Session
|
|
|
|
import models
|
|
from database import engine, SessionLocal
|
|
import auth # Import the new auth module
|
|
import schemas
|
|
from starlette.responses import Response
|
|
from config import settings # Import settings to get the secret key
|
|
from listener_manager import ListenerManager
|
|
from websocket_manager import WebSocketManager
|
|
|
|
# --- Absolute Path Configuration ---
|
|
# Get the absolute path of the directory where this file is located
|
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
STATIC_DIR = os.path.join(BASE_DIR, "static")
|
|
TEMPLATES_DIR = os.path.join(BASE_DIR, "templates")
|
|
|
|
async def background_listener_startup(app: FastAPI):
|
|
"""A non-blocking task to start listeners after the app has started."""
|
|
print("Background task: Starting listeners for all users...")
|
|
db = SessionLocal()
|
|
users = db.query(models.User).all()
|
|
db.close()
|
|
for user in users:
|
|
# Use try/except to ensure one failing listener doesn't stop others
|
|
try:
|
|
await app.state.listener_manager.start_listener_for_user(user, app.state.websocket_manager)
|
|
except Exception as e:
|
|
print(f"ERROR: Failed to start listener for user {user.id} ({user.username}): {e}")
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
# This code runs on startup
|
|
print("Application startup: Creating database tables...")
|
|
app.state.websocket_manager = WebSocketManager()
|
|
app.state.listener_manager = ListenerManager()
|
|
models.Base.metadata.create_all(bind=engine)
|
|
print("Application startup: Database tables created.")
|
|
|
|
# Decouple listener startup from the main application startup
|
|
asyncio.create_task(background_listener_startup(app))
|
|
yield
|
|
|
|
# This code runs on shutdown
|
|
print("Application shutdown: Stopping all listeners...")
|
|
manager = app.state.listener_manager
|
|
# Create a copy of keys to avoid runtime errors from changing dict size
|
|
for user_id in list(manager.active_listeners.keys()):
|
|
await manager.stop_listener_for_user(user_id)
|
|
|
|
app = FastAPI(lifespan=lifespan)
|
|
|
|
# Add session middleware. A secret key is required for signing the session cookie.
|
|
# We can reuse our encryption key for this, but in production you might want a separate key.
|
|
app.add_middleware(SessionMiddleware, secret_key=settings.ENCRYPTION_KEY)
|
|
|
|
# Mount the 'static' directory using an absolute path for reliability
|
|
# This MUST be done before the routes that depend on it are defined.
|
|
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")
|
|
|
|
# Add the authentication router
|
|
app.include_router(auth.router)
|
|
|
|
# --- Template Dependency ---
|
|
def get_templates():
|
|
return Jinja2Templates(directory=TEMPLATES_DIR)
|
|
|
|
@app.get("/")
|
|
async def read_root(request: Request, templates: Jinja2Templates = Depends(get_templates)):
|
|
return templates.TemplateResponse("login.html", {"request": request})
|
|
|
|
@app.get("/dashboard")
|
|
async def read_dashboard(request: Request, db: Session = Depends(auth.get_db),
|
|
templates: Jinja2Templates = Depends(get_templates)):
|
|
# This is our protected route. It checks if a user_id exists in the session.
|
|
user_id = request.session.get('user_id')
|
|
if not user_id:
|
|
# If not, redirect them to the login page.
|
|
return RedirectResponse(url="/")
|
|
|
|
user = db.query(models.User).filter(models.User.id == user_id).first()
|
|
overlay_url = f"{settings.APP_BASE_URL}/overlay/{user.id}"
|
|
|
|
# Ensure user has settings, create if they don't for some reason
|
|
if not user.settings:
|
|
user.settings = models.Setting()
|
|
db.commit()
|
|
|
|
return templates.TemplateResponse("dashboard.html", {
|
|
"request": request,
|
|
"user": user,
|
|
"overlay_url": overlay_url,
|
|
"current_theme": user.settings.overlay_theme,
|
|
"settings": settings,
|
|
"custom_themes": user.custom_themes
|
|
})
|
|
|
|
@app.get("/logout")
|
|
async def logout(request: Request):
|
|
# Clear the session cookie
|
|
request.session.clear()
|
|
return RedirectResponse(url="/")
|
|
|
|
@app.get("/help/css")
|
|
async def css_help(request: Request, templates: Jinja2Templates = Depends(get_templates)):
|
|
return templates.TemplateResponse("help_css.html", {"request": request})
|
|
|
|
@app.get("/overlay/{user_id}")
|
|
async def read_overlay(request: Request, user_id: int, theme_override: str = None,
|
|
db: Session = Depends(auth.get_db), templates: Jinja2Templates = Depends(get_templates)):
|
|
# This endpoint serves the overlay page.
|
|
user = db.query(models.User).filter(models.User.id == user_id).first()
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
theme_name = "dark-purple" # Default theme
|
|
if theme_override:
|
|
theme_name = theme_override
|
|
elif user.settings and user.settings.overlay_theme:
|
|
theme_name = user.settings.overlay_theme
|
|
|
|
# Check if it's a custom theme
|
|
if theme_name.startswith("custom-"):
|
|
theme_id = int(theme_name.split("-")[1])
|
|
theme = db.query(models.CustomTheme).filter(models.CustomTheme.id == theme_id, models.CustomTheme.owner_id == user.id).first()
|
|
if not theme:
|
|
raise HTTPException(status_code=404, detail="Custom theme not found")
|
|
# Use a generic overlay template that will link to the dynamic CSS
|
|
return templates.TemplateResponse("overlay-custom.html", {"request": request, "theme_id": theme.id})
|
|
|
|
return templates.TemplateResponse(f"overlay-{theme_name}.html", {"request": request})
|
|
|
|
@app.post("/api/settings")
|
|
async def update_settings(settings_data: schemas.SettingsUpdate, request: Request, db: Session = Depends(auth.get_db)):
|
|
user_id = request.session.get('user_id')
|
|
if not user_id:
|
|
raise HTTPException(status_code=401, detail="Not authenticated")
|
|
|
|
user = db.query(models.User).filter(models.User.id == user_id).first()
|
|
if not user.settings:
|
|
user.settings = models.Setting()
|
|
|
|
user.settings.overlay_theme = settings_data.overlay_theme
|
|
db.commit()
|
|
|
|
return {"message": "Settings updated successfully"}
|
|
|
|
@app.post("/api/themes", response_model=schemas.CustomTheme)
|
|
async def create_theme(theme_data: schemas.CustomThemeCreate, request: Request, db: Session = Depends(auth.get_db)):
|
|
user_id = request.session.get('user_id')
|
|
if not user_id:
|
|
raise HTTPException(status_code=401, detail="Not authenticated")
|
|
|
|
new_theme = models.CustomTheme(**theme_data.dict(), owner_id=user_id)
|
|
db.add(new_theme)
|
|
db.commit()
|
|
db.refresh(new_theme)
|
|
return new_theme
|
|
|
|
@app.delete("/api/themes/{theme_id}")
|
|
async def delete_theme(theme_id: int, request: Request, db: Session = Depends(auth.get_db)):
|
|
user_id = request.session.get('user_id')
|
|
if not user_id:
|
|
raise HTTPException(status_code=401, detail="Not authenticated")
|
|
|
|
theme = db.query(models.CustomTheme).filter(models.CustomTheme.id == theme_id, models.CustomTheme.owner_id == user_id).first()
|
|
if not theme:
|
|
raise HTTPException(status_code=404, detail="Theme not found")
|
|
|
|
db.delete(theme)
|
|
db.commit()
|
|
return {"message": "Theme deleted successfully"}
|
|
|
|
@app.get("/css/custom/{theme_id}")
|
|
async def get_custom_css(theme_id: int, db: Session = Depends(auth.get_db)):
|
|
theme = db.query(models.CustomTheme).filter(models.CustomTheme.id == theme_id).first()
|
|
if not theme:
|
|
raise HTTPException(status_code=404, detail="Custom theme not found")
|
|
|
|
return Response(content=theme.css_content, media_type="text/css")
|
|
|
|
@app.websocket("/ws/{user_id}")
|
|
async def websocket_endpoint(websocket: WebSocket, user_id: int):
|
|
manager = websocket.app.state.websocket_manager
|
|
await manager.connect(user_id, websocket)
|
|
try:
|
|
while True:
|
|
# Keep the connection alive
|
|
await websocket.receive_text()
|
|
except Exception:
|
|
manager.disconnect(user_id, websocket)
|
|
print(f"WebSocket for user {user_id} disconnected.") |